72 lines
No EOL
2.3 KiB
Python
72 lines
No EOL
2.3 KiB
Python
import tqdm
|
|
import spacy
|
|
from spacy.language import Language
|
|
from spacy_language_detection import LanguageDetector
|
|
from urlextract import URLExtract
|
|
import requests
|
|
|
|
from .typesense_client import client
|
|
|
|
# Load spaCy models
|
|
nlp_en = spacy.load("en_core_web_sm")
|
|
nlp_fr = spacy.load("fr_core_news_sm")
|
|
|
|
# Create language detector
|
|
def get_lang_detector(nlp, name):
|
|
return LanguageDetector(seed=42)
|
|
|
|
Language.factory("language_detector", func=get_lang_detector)
|
|
nlp_en.add_pipe("language_detector", last=True)
|
|
|
|
# Initialize URL extractor
|
|
extractor = URLExtract()
|
|
|
|
def count_words(text, lang):
|
|
if lang == 'en':
|
|
doc = nlp_en(text)
|
|
elif lang == 'fr':
|
|
doc = nlp_fr(text)
|
|
else:
|
|
# Default to English if language is not supported
|
|
doc = nlp_en(text)
|
|
|
|
# Count words excluding stopwords
|
|
word_count = len([token for token in doc if not token.is_stop and not token.is_punct])
|
|
return word_count
|
|
|
|
def resolve_shortened_url(url):
|
|
try:
|
|
response = requests.head(url, allow_redirects=True, timeout=5)
|
|
return response.url
|
|
except:
|
|
return url
|
|
|
|
def extract_and_resolve_urls(text):
|
|
urls = extractor.find_urls(text)
|
|
resolved_urls = [resolve_shortened_url(url) for url in urls]
|
|
return list(set(resolved_urls)) # Remove duplicates
|
|
|
|
def documents_to_database(documents_list, os_client=client):
|
|
try:
|
|
for document in tqdm.tqdm(documents_list.to_dict(orient='records')):
|
|
# Detect language
|
|
doc = nlp_en(document['texte'])
|
|
lang = doc._.language['language']
|
|
|
|
# Count words
|
|
word_count = count_words(document['texte'], lang)
|
|
|
|
# Extract and resolve URLs
|
|
urls = extract_and_resolve_urls(document['texte'])
|
|
|
|
# Add language, word count, and URLs to the document
|
|
document['langue'] = lang
|
|
document['nombre_de_mots'] = word_count
|
|
document['texte_urls'] = urls
|
|
|
|
# Create document in Typesense
|
|
os_client.collections['social_media_posts'].documents.create(document)
|
|
|
|
print(f"Successfully inserted {len(documents_list)} documents.")
|
|
except Exception as e:
|
|
print(f"Error inserting documents: {str(e)}") |