# -*- coding: utf-8 -*-
"""A set of utility functions to support social event detection tasks."""
import torch
import torch.nn.functional as F
import numpy as np
from sentence_transformers import SentenceTransformer
import warnings
from itertools import combinations
from datetime import datetime
import numpy as np
[docs]def construct_graph(df, G=None):
"""Construct a graph from a DataFrame containing social media data.
Parameters
----------
df : pandas.DataFrame
DataFrame containing social media data with columns:
tweet_id, user_mentions, user_id, entities, sampled_words
G : networkx.Graph, optional (default=None)
Existing graph to add nodes/edges to. If None, creates new graph.
Returns
-------
G : networkx.Graph
Graph with nodes for tweets, users, entities and words, and edges between them.
"""
import networkx as nx
if G is None:
G = nx.Graph()
for _, row in df.iterrows():
# Add tweet node
tid = 't_' + str(row['tweet_id'])
G.add_node(tid)
G.nodes[tid]['tweet_id'] = True
# Add user nodes
user_ids = row['user_mentions']
user_ids.append(row['user_id'])
user_ids = ['u_' + str(each) for each in user_ids]
G.add_nodes_from(user_ids)
for each in user_ids:
G.nodes[each]['user_id'] = True
# Add entity nodes
entities = row['entities']
G.add_nodes_from(entities)
for each in entities:
G.nodes[each]['entity'] = True
# Add word nodes
words = ['w_' + each for each in row['sampled_words']]
G.add_nodes_from(words)
for each in words:
G.nodes[each]['word'] = True
# Add edges between tweet and other nodes
edges = []
edges += [(tid, each) for each in user_ids]
edges += [(tid, each) for each in entities]
edges += [(tid, each) for each in words]
G.add_edges_from(edges)
return G
[docs]def tokenize_text(text, max_length=512):
"""Tokenize text for social event detection tasks.
Parameters
----------
text : str
The input text to tokenize.
max_length : int, optional (default=512)
Maximum length of tokenized sequence.
Returns
-------
tokens : list
List of tokenized words/subwords.
"""
# Remove extra whitespace
text = ' '.join(text.split())
# Basic tokenization by splitting on whitespace
tokens = text.lower().split()
# Truncate if exceeds max length
if len(tokens) > max_length:
tokens = tokens[:max_length]
return tokens
[docs]def pprint(params, offset=0, printer=repr):
"""Pretty print the dictionary 'params'.
Parameters
----------
params : dict
The dictionary to pretty print
offset : int, optional (default=0)
The offset at the beginning of each line
printer : callable, optional (default=repr)
The function to convert entries to strings
Returns
-------
str
Pretty printed string representation
"""
params_list = list()
this_line_length = offset
line_sep = ',\n' + (1 + offset) * ' '
for i, (k, v) in enumerate(sorted(params.items())):
if type(v) is float:
# use str for representing floating point numbers
# this way we get consistent representation across
# architectures and versions.
this_repr = '%s=%s' % (k, str(v))
else:
# use repr of the rest
this_repr = '%s=%s' % (k, printer(v))
if len(this_repr) > 500:
this_repr = this_repr[:300] + '...' + this_repr[-100:]
if i > 0:
if this_line_length + len(this_repr) >= 75 or '\n' in this_repr:
params_list.append(line_sep)
this_line_length = len(line_sep)
else:
params_list.append(', ')
this_line_length += 2
params_list.append(this_repr)
this_line_length += len(this_repr)
lines = ''.join(params_list)
# Strip trailing space to avoid nightmare in doctests
lines = '\n'.join(l.rstrip(' ') for l in lines.split('\n'))
return lines
[docs]def validate_device(gpu_id):
"""Validate the input GPU ID is valid on the given environment.
If no GPU is presented, return 'cpu'.
Parameters
----------
gpu_id : int
GPU ID to check.
Returns
-------
device : str
Valid device, e.g., 'cuda:0' or 'cpu'.
"""
# cast to int for checking
gpu_id = int(gpu_id)
# if it is cpu
if gpu_id == -1:
return 'cpu'
# if gpu is available
if torch.cuda.is_available():
# check if gpu id is between 0 and the total number of GPUs
check_parameter(gpu_id, 0, torch.cuda.device_count(),
param_name='gpu id', include_left=True,
include_right=False)
device = 'cuda:{}'.format(gpu_id)
else:
if gpu_id != 'cpu':
warnings.warn('The cuda is not available. Set to cpu.')
device = 'cpu'
return device
[docs]def check_parameter(value, lower, upper, param_name, include_left=True, include_right=True):
"""Check if a parameter value is within specified bounds.
Parameters
----------
value : int or float
The parameter value to check
lower : int or float
Lower bound
upper : int or float
Upper bound
param_name : str
Name of the parameter for error messages
include_left : bool, optional (default=True)
Whether to include lower bound in valid range
include_right : bool, optional (default=True)
Whether to include upper bound in valid range
Returns
-------
bool
True if parameter is valid, raises ValueError otherwise
"""
if include_left:
if value < lower:
raise ValueError(f"{param_name} must be greater than or equal to {lower}")
if include_right:
if value > upper:
raise ValueError(f"{param_name} must be less than or equal to {upper}")
return True
[docs]def currentTime():
"""Get current time as formatted string.
Returns
-------
str
Current time in format 'YYYY-MM-DD HH:MM:SS'
"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
[docs]def sim(z1, z2):
"""Compute cosine similarity between two sets of vectors.
Parameters
----------
z1 : torch.Tensor
First set of vectors
z2 : torch.Tensor
Second set of vectors
Returns
-------
torch.Tensor
Similarity matrix
"""
z1 = F.normalize(z1)
z2 = F.normalize(z2)
return torch.mm(z1, z2.t())
[docs]def pairwise_sample(embeddings, labels=None, model=None):
if model == None:
labels = labels.cpu().data.numpy()
indices = np.arange(0,len(labels),1)
pairs = np.array(list(combinations(indices, 2)))
pair_labels = (labels[pairs[:,0]]==labels[pairs[:,1]])
pair_matrix = np.eye(len(labels))
ind = np.where(pair_labels)
pair_matrix[pairs[ind[0],0],pairs[ind[0],1]] = 1
pair_matrix[pairs[ind[0],1], pairs[ind[0],0]] = 1
return torch.LongTensor(pairs), torch.LongTensor(pair_labels.astype(int)),torch.LongTensor(pair_matrix)
else:
pair_matrix = model(embeddings)
return pair_matrix
[docs]def SBERT_embed(s_list, language):
'''
Use Sentence-BERT to embed sentences.
s_list: a list of sentences/ tokens to be embedded.
language: the language of the sentences ('English', 'French', 'Arabic').
output: the embeddings of the sentences/ tokens.
'''
# Model paths or names for each language
model_map = {
'English': '../model/model_needed/all-MiniLM-L6-v2',
'French': '../model/model_needed/distiluse-base-multilingual-cased-v1',
'Arabic': '../model/model_needed/paraphrase-multilingual-mpnet-base-v2'
}
# Default model for Hugging Face
hf_model_map = {
'English': 'sentence-transformers/all-MiniLM-L6-v2',
'French': 'sentence-transformers/distiluse-base-multilingual-cased-v1',
'Arabic': 'sentence-transformers/paraphrase-multilingual-mpnet-base-v2'
}
# Print language and model being used
print(f"Embedding sentences in language: {language}")
# Determine model path
model_path = model_map.get(language)
if not model_path:
raise ValueError(f"Unsupported language: {language}. Supported languages are: {', '.join(model_map.keys())}")
print(f"Using model: {model_path}")
# Load the model, downloading if necessary
try:
model = SentenceTransformer(model_path)
print(f"Successfully loaded model from local path: {model_path}")
except Exception as e:
print(f"Model {model_path} not found locally. Attempting to download from Hugging Face...")
model = SentenceTransformer(hf_model_map[language])
print(f"Model downloaded from Hugging Face: {hf_model_map[language]}")
# Compute embeddings
embeddings = model.encode(s_list, convert_to_tensor=True, normalize_embeddings=True)
print(f"Computed embeddings for {len(s_list)} sentences/tokens.")
return embeddings.cpu()
[docs]def DS_Combin(alpha, classes):
"""
:param alpha: All Dirichlet distribution parameters.
:return: Combined Dirichlet distribution parameters.
"""
def DS_Combin_two(alpha1, alpha2, classes):
"""
:param alpha1: Dirichlet distribution parameters of view 1
:param alpha2: Dirichlet distribution parameters of view 2
:return: Combined Dirichlet distribution parameters
"""
alpha = dict()
alpha[0], alpha[1] = alpha1, alpha2
b, S, E, u = dict(), dict(), dict(), dict()
for v in range(2):
S[v] = torch.sum(alpha[v], dim=1, keepdim=True)
E[v] = alpha[v] - 1
b[v] = E[v] / (S[v].expand(E[v].shape))
u[v] = classes / S[v]
# b^0 @ b^(0+1)
bb = torch.bmm(b[0].view(-1, classes, 1), b[1].view(-1, 1, classes))
# b^0 * u^1
uv1_expand = u[1].expand(b[0].shape)
bu = torch.mul(b[0], uv1_expand)
# b^1 * u^0
uv_expand = u[0].expand(b[0].shape)
ub = torch.mul(b[1], uv_expand)
# calculate C
bb_sum = torch.sum(bb, dim=(1, 2), out=None)
bb_diag = torch.diagonal(bb, dim1=-2, dim2=-1).sum(-1)
C = bb_sum - bb_diag
# calculate b^a
b_a = (torch.mul(b[0], b[1]) + bu + ub) / ((1 - C).view(-1, 1).expand(b[0].shape))
# calculate u^a
u_a = torch.mul(u[0], u[1]) / ((1 - C).view(-1, 1).expand(u[0].shape))
# calculate new S
S_a = classes / u_a
# calculate new e_k
e_a = torch.mul(b_a, S_a.expand(b_a.shape))
alpha_a = e_a + 1
return alpha_a, u_a
if len(alpha)==1:
S = torch.sum(alpha[0], dim=1, keepdim=True)
u = classes / S
return alpha[0],u
for v in range(len(alpha) - 1):
if v == 0:
alpha_a,u_a = DS_Combin_two(alpha[0], alpha[1], classes)
else:
alpha_a,u_a = DS_Combin_two(alpha_a, alpha[v + 1], classes)
return alpha_a,u_a
[docs]def graph_statistics(G, save_path):
message = '\nGraph statistics:\n'
num_nodes = G.number_of_nodes()
num_edges = G.number_of_edges()
ave_degree = (num_edges / 2) // num_nodes
in_degrees = G.in_degrees()
isolated_nodes = torch.zeros([in_degrees.size()[0]], dtype=torch.long)
isolated_nodes = (in_degrees == isolated_nodes)
torch.save(isolated_nodes, save_path + '/isolated_nodes.pt')
num_isolated_nodes = torch.sum(isolated_nodes).item()
message += 'We have ' + str(num_nodes) + ' nodes.\n'
message += 'We have ' + str(num_edges / 2) + ' in-edges.\n'
message += 'Average degree: ' + str(ave_degree) + '\n'
message += 'Number of isolated nodes: ' + str(num_isolated_nodes) + '\n'
print(message)
with open(save_path + "/graph_statistics.txt", "w") as f:
f.write(message)
return num_isolated_nodes