Version initiale pour importer les données

This commit is contained in:
François Pelletier 2024-07-18 20:04:51 -04:00
parent 3d337d064b
commit aff201f6cf
22 changed files with 694 additions and 0 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
/.idea/
/.env
/import_data/data/

View file

@ -0,0 +1,14 @@
import os
import utils.reseau_social_data as rs_data
# %% Créer le répertoire data s'il n'existe pas
if not os.path.exists('data'):
os.makedirs('data')
# %% Créer les répertoires pour chaque réseau sociaux
for reseau_social in rs_data.reseau_social_data:
if not os.path.exists(f'data/{reseau_social["nom"]}/'):
os.makedirs(f'data/{reseau_social["nom"]}/')
for repertoire in reseau_social['repertoires']:
if not os.path.exists(f'data/{reseau_social["nom"]}/{repertoire}/'):
os.makedirs(f'data/{reseau_social["nom"]}/{repertoire}/')

View file

@ -0,0 +1,18 @@
import requests
import utils.config
from utils.opensearch import opensearch_client
from utils.reseau_social_data import reseau_social_data as rs_data
# %%
rs_data
# %%
opensearch_client.info()
# %%
for rs in rs_data:
nom = rs.get("nom")
for repertoire in rs.get("repertoires", []):
index_name = f"rs_{nom}_{repertoire}".lower()
opensearch_client.indices.create(index=index_name)
print(f"Index '{index_name}' créé")

View file

@ -0,0 +1,59 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
fb_data_path = ['data/FacebookBusiness/posts/profile_posts_1.json',
'data/FacebookBusiness/posts/uncategorized_photos.json',
'data/FacebookBusiness/posts/videos.json']
with open(fb_data_path[0], "r", encoding="raw-unicode-escape") as posts:
posts_json = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
posts_medias = []
for post in posts_json:
# data
data_post_items = post['data']
texte_post_list = []
for item in data_post_items:
if item.get('post'):
texte_post_list.append(item['post'])
texte = "\n".join(texte_post_list)
# attachments
for attachment in post['attachments']:
if attachment.get('data'):
for data_item in attachment['data']:
if data_item.get('media'):
media = data_item['media']
if len(texte) > 1:
posts_medias.append({"network": "FacebookBusiness",
"type": "posts",
"index": "rs_facebookbusiness_posts",
"chemin": fb_data_path[0],
"texte": texte,
"creation_timestamp": media["creation_timestamp"]})
# In[ ]:
posts_medias_df = pd.DataFrame(posts_medias)
# In[ ]:
posts_medias_df['datepublication'] = posts_medias_df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat())
# In[ ]:
del posts_medias_df['creation_timestamp']
# In[ ]:
posts_medias_df.fillna(value="", inplace=True)
# In[ ]:
posts_medias_df.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
documents_to_database(posts_medias_df)

View file

@ -0,0 +1,43 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
fb_data_path = ['data/Facebook/comments_and_reactions/comments.json']
with open(fb_data_path[0], "r", encoding="raw-unicode-escape") as posts:
comments_json = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
facebook_comments = []
for comment in comments_json['comments_v2']:
if comment.get('data'):
for data_item in comment['data']:
if data_item.get('comment'):
comment = data_item['comment']
facebook_comments.append({"network": "Facebook",
"type": "comments",
"index": "rs_facebook_comments_and_reactions",
"chemin": fb_data_path[0],
"texte": comment["comment"],
"creation_timestamp": comment["timestamp"]})
# In[ ]:
facebook_comments_df = pd.DataFrame(facebook_comments)
# In[ ]:
facebook_comments_df['datepublication'] = facebook_comments_df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat())
# In[ ]:
facebook_comments_df.fillna(value="", inplace=True)
# In[ ]:
del facebook_comments_df['creation_timestamp']
# In[ ]:
documents_to_database(facebook_comments_df)

View file

@ -0,0 +1,44 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
fb_data_path = ['data/Facebook/posts/your_uncategorized_photos.json']
with open(fb_data_path[0], "r", encoding="raw-unicode-escape") as posts:
photos_json = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
facebook_photos = photos_json['other_photos_v2']
# In[ ]:
facebook_photos_df = pd.DataFrame(facebook_photos)
# In[ ]:
# Filter out posts without a description
facebook_photos_df = facebook_photos_df[~facebook_photos_df['description'].isnull()]
# In[ ]:
facebook_photos_df['datepublication'] = facebook_photos_df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat())
facebook_photos_df['index'] = "rs_facebook_posts"
facebook_photos_df['network'] = "Facebook"
facebook_photos_df['type'] = "posts"
facebook_photos_df['chemin'] = fb_data_path[0]
# In[ ]:
facebook_photos_df.rename(columns={"description": "texte"}, inplace=True)
# In[ ]:
del facebook_photos_df['creation_timestamp']
del facebook_photos_df['media_metadata']
# In[ ]:
facebook_photos_df.fillna(value="", inplace=True)
# In[ ]:
documents_to_database(facebook_photos_df)

View file

@ -0,0 +1,70 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
instagram_data_path = 'import_data/data/Instagram/content/posts_1.json'
with open(instagram_data_path, "r", encoding="raw-unicode-escape") as posts:
posts_json = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
posts_medias = []
for post in posts_json:
medias = post['media']
# S'il y a un seul média
if len(medias) == 1:
media = medias[0]
posts_medias.append({
"uri": [media["uri"]],
"chemin": instagram_data_path,
"index": "rs_instagram_content",
"type": "content",
"network": "Instagram",
"texte": media["title"],
"creation_timestamp": media["creation_timestamp"]})
else:
# S'il y a plusieurs médias,
# on va itérer sur chacun des médias
# pour récupérer les URI
title = post['title']
creation_timestamp = post['creation_timestamp']
list_uris = []
for media in medias:
uri = media['uri']
list_uris.append(uri)
posts_medias.append({
"uri": list_uris,
"chemin": instagram_data_path,
"index": "rs_instagram_content",
"type": "posts",
"network": "Instagram",
"texte": title,
"creation_timestamp": creation_timestamp})
# In[ ]:
posts_medias_df = pd.DataFrame(posts_medias)
# In[ ]:
posts_medias_df['datepublication'] = posts_medias_df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat())
# In[ ]:
del posts_medias_df['creation_timestamp']
# In[ ]:
posts_medias_df.fillna(value="", inplace=True)
# In[ ]:
posts_medias_df.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
# Filter empty texte
posts_medias_df = posts_medias_df[~posts_medias_df['texte'].str.strip().eq('')]
# In[ ]:
documents_to_database(posts_medias_df)

View file

@ -0,0 +1,49 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
instagram_data_path = 'data/Instagram/content/reels.json'
with open(instagram_data_path, "r", encoding="raw-unicode-escape") as posts:
reels_json = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
ig_reels_media = [x['media'][0] for x in reels_json['ig_reels_media']]
# In[ ]:
ig_reels_df = pd.DataFrame(ig_reels_media)
# In[ ]:
ig_reels_df['datepublication'] = ig_reels_df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat())
ig_reels_df['index'] = "rs_instagram_content"
ig_reels_df['type'] = "reels"
ig_reels_df['network'] = "Instagram"
ig_reels_df['chemin'] = instagram_data_path
# In[ ]:
ig_reels_df.rename(columns={"title": "texte"}, inplace=True)
# In[ ]:
del ig_reels_df['creation_timestamp']
del ig_reels_df['media_metadata']
del ig_reels_df['cross_post_source']
del ig_reels_df['dubbing_info']
# In[ ]:
ig_reels_df.fillna(value="", inplace=True)
# In[ ]:
ig_reels_df.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
# Filter empty texte
ig_reels_df = ig_reels_df[~ig_reels_df['texte'].str.strip().eq('')]
# In[ ]:
documents_to_database(ig_reels_df)

View file

@ -0,0 +1,49 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
instagram_data_path = 'data/Instagram/content/stories.json'
with open(instagram_data_path, "r", encoding="raw-unicode-escape") as posts:
stories_json = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
ig_stories_df = pd.DataFrame(stories_json['ig_stories'])
# In[ ]:
ig_stories_df['datepublication'] = ig_stories_df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat())
# In[ ]:
ig_stories_df['index'] = "rs_instagram_content"
ig_stories_df['type'] = "stories"
ig_stories_df['network'] = "Instagram"
ig_stories_df['chemin'] = instagram_data_path
# In[ ]:
ig_stories_df.rename(columns={"title": "texte"}, inplace=True)
# In[ ]:
del ig_stories_df['creation_timestamp']
del ig_stories_df['media_metadata']
del ig_stories_df['cross_post_source']
del ig_stories_df['ai_stickers']
del ig_stories_df['dubbing_info']
# In[ ]:
ig_stories_df.fillna(value="", inplace=True)
# In[ ]:
ig_stories_df.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
# Filter empty texte
ig_stories_df = ig_stories_df[~ig_stories_df['texte'].str.strip('\n').str.strip().eq('')]
# In[ ]:
documents_to_database(ig_stories_df)

View file

@ -0,0 +1,40 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
instagram_data_path = 'data/Instagram/comments/post_comments_1.json'
with open(instagram_data_path, "r", encoding="raw-unicode-escape") as posts:
post_comments_1 = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
ig_comments = []
for comment in post_comments_1:
ig_comments.append({"texte": comment['string_map_data']['Comment']['value'],
'datepublication': datetime.datetime.fromtimestamp(
timestamp=comment['string_map_data']['Time']['timestamp']).isoformat(),
"chemin": instagram_data_path,
"index": "rs_instagram_comments",
"type": "comments",
"network": "Instagram"})
# In[ ]:
ig_comments_df = pd.DataFrame(ig_comments)
# In[ ]:
ig_comments_df.fillna(value="", inplace=True)
# In[ ]:
ig_comments_df.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
# Filter empty texte
ig_comments_df = ig_comments_df[~ig_comments_df['texte'].str.strip('\n').str.strip().eq('')]
# In[ ]:
documents_to_database(ig_comments_df)

View file

@ -0,0 +1,40 @@
import datetime
import pandas as pd
import json
from utils.get_ids import get_idtypedocument, get_idreseausocial
from utils.documents_to_database import documents_to_database
from utils.convert_encoding_meta import convert_encoding_meta
# In[ ]:
instagram_data_path = 'data/Instagram/comments/reels_comments.json'
with open(instagram_data_path, "r", encoding="raw-unicode-escape") as posts:
reels_comments = json.loads(convert_encoding_meta(posts.read()))
# In[ ]:
ig_comments = []
for comment in reels_comments['comments_reels_comments']:
ig_comments.append({"texte": comment['string_map_data']['Comment']['value'],
'datepublication': datetime.datetime.fromtimestamp(
timestamp=comment['string_map_data']['Time']['timestamp']).isoformat(),
"chemin": instagram_data_path,
"index": "rs_instagram_comments",
"type": "comments",
"network": "Instagram"})
# In[ ]:
ig_comments_df = pd.DataFrame(ig_comments)
# In[ ]:
ig_comments_df.fillna(value="", inplace=True)
# In[ ]:
ig_comments_df.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
# Filter empty texte
ig_comments_df = ig_comments_df[~ig_comments_df['texte'].str.strip('\n').str.strip().eq('')]
# In[ ]:
documents_to_database(ig_comments_df)

View file

@ -0,0 +1,44 @@
import pandas as pd
import datetime
from utils.documents_to_database import documents_to_database
# In[ ]:
linkedin_data_path = "data/LinkedIn/shares/Shares.csv"
raw_shares = pd.read_csv(linkedin_data_path)
# In[ ]:
raw_shares['index'] = "rs_linkedin_shares"
raw_shares['type'] = "posts"
raw_shares['network'] = "LinkedIn"
raw_shares['chemin'] = linkedin_data_path
# In[ ]:
raw_shares["datepublication"] = raw_shares["Date"].apply(
lambda x: str(datetime.datetime.fromisoformat(x).isoformat()))
del raw_shares["Date"]
# In[ ]:
raw_shares.rename(columns={"ShareLink": "uri", "ShareCommentary": "texte"}, inplace=True)
# In[ ]:
raw_shares["texte"] = raw_shares["texte"].apply(lambda x: str(x))
# In[ ]:
del raw_shares["SharedUrl"]
del raw_shares["MediaUrl"]
del raw_shares["Visibility"]
# In[ ]:
raw_shares.fillna(value="", inplace=True)
# In[ ]:
raw_shares.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
# Filter empty texte
raw_shares = raw_shares[~raw_shares['texte'].str.strip('\n').str.strip().eq('')]
# In[ ]:
documents_to_database(raw_shares)

View file

@ -0,0 +1,71 @@
import pandas as pd
import codecs
import datetime
from utils.documents_to_database import documents_to_database
# In[ ]:
linkedin_data_path = "import_data/data/LinkedIn/comments/Comments.csv"
# In[ ]:
raw_comments_list = []
with (open(linkedin_data_path, 'r') as f):
current_comment = []
for my_line in f.readlines():
if my_line.startswith("Date,Link,Message"):
headers = my_line.strip().split(",")
else:
# Check if line starts with a ISO 8601 date
try:
datetime.datetime.strptime(str(my_line).split(",")[0], '%Y-%m-%d %H:%M:%S')
date_test = True
except ValueError:
date_test = False
if date_test:
if len(current_comment) == 3:
current_comment[2] = (str(current_comment[2])
.replace('\\"', '"')
.replace("\\'", r"'"))
raw_comments_list.append(current_comment)
current_comment = my_line.strip().split(",", maxsplit=2)
pass
else:
current_comment[2] = current_comment[2] + " " + my_line.strip()
# In[ ]:
raw_comments_csv = pd.DataFrame(raw_comments_list, columns=headers)
raw_comments = raw_comments_csv[(raw_comments_csv['Message'] != "")].drop_duplicates()
# In[ ]:
raw_comments['index'] = "rs_linkedin_comments"
raw_comments['type'] = "comments"
raw_comments['network'] = "LinkedIn"
raw_comments['chemin'] = linkedin_data_path
# In[ ]:
# Remove empty header
raw_comments = raw_comments[1:].reset_index(drop=True)
# In[ ]:
raw_comments["datepublication"] = raw_comments["Date"].apply(
lambda x: str(datetime.datetime.fromisoformat(str(x)).isoformat()))
del raw_comments["Date"]
# In[ ]:
raw_comments.rename(columns={"Link": "uri", "Message": "texte"}, inplace=True)
# In[ ]:
raw_comments["chemin"] = linkedin_data_path
# In[ ]:
raw_comments.fillna(value="", inplace=True)
# In[ ]:
raw_comments.drop_duplicates(subset=['texte', 'datepublication'], inplace=True)
# In[ ]:
# Filter empty texte
raw_comments = raw_comments[~raw_comments['texte'].str.strip('\n').str.strip().eq('')]
# In[ ]:
documents_to_database(raw_comments)

View file

@ -0,0 +1,65 @@
import datetime
import re
import xmltodict
import pandas as pd
import markdownify
import utils.config
from utils.get_ids import get_idreseausocial, get_idtypedocument
from utils.documents_to_database import documents_to_database
# In[ ]:
wordpress_xml_path = "import_data/data/Wordpress/jevalideca/wordpress.xml"
with open(wordpress_xml_path, "r") as xml_file:
wordpress_xml = xml_file.read()
# In[ ]:
wordpress_dict = xmltodict.parse(wordpress_xml)
# In[ ]:
items_df = pd.DataFrame(wordpress_dict['rss']['channel']['item'])
# In[ ]:
items_df_filter = items_df[
(items_df['wp:post_type'].isin(['page', 'post'])) & (items_df['wp:status'] == 'publish')].copy()
# In[ ]:
items_df_filter['datepublication'] = items_df_filter['wp:post_date'].apply(
lambda x: str(datetime.datetime.fromisoformat(x).isoformat()))
# In[ ]:
def wp_to_markdown(x):
try:
md_text = re.sub(r'\n+', ' ', markdownify.markdownify(x, heading_style='ATX')).strip()
except Exception as e:
print(e)
md_text = str()
pass
return md_text
# In[ ]:
items_df_filter['texte'] = items_df_filter['content:encoded'].apply(lambda x: wp_to_markdown(x))
# In[ ]:
items_df_filter.rename(columns={"link": "uri", "wp:post_type": "type"}, inplace=True)
# In[ ]:
items_df_filter['index'] = "rs_wordpress_jevalideca"
items_df_filter['network'] = "Wordpress"
items_df_filter['chemin'] = wordpress_xml_path
# In[ ]:
items_df_filter.fillna(value="", inplace=True)
# In[ ]:
documents_to_database(items_df_filter[['title',
'uri',
'type',
'datepublication',
'texte',
'index',
'network',
'chemin']])

View file

@ -0,0 +1,6 @@
markdownify==0.11.6
pandas==2.2.0
requests==2.31.0
xmltodict==0.13.0
python_dotenv==1.0.1
pyarrow==17.0.0

View file

View file

@ -0,0 +1,2 @@
API_URL = "http://localhost:8000"
WORDPRESS_NAMES = "jevalideca" # Séparer les noms de blog par une virgule

View file

@ -0,0 +1,7 @@
import re
def convert_encoding_meta(text):
conv_text = re.sub(r'[\xc2-\xf4][\x80-\xbf]+',
lambda m: m.group(0).encode('latin1').decode('utf8'), text)
return conv_text

View file

@ -0,0 +1,20 @@
import pandas as pd
import requests
from utils.opensearch import opensearch_client
def documents_to_database(documents_list, os_client=opensearch_client):
# Check if opensearch is available
if not os_client.ping():
raise requests.exceptions.ConnectionError("Opensearch is not reachable")
# Check if the specified index exists
if not os_client.indices.exists(index=documents_list['index'].iloc[0]):
raise requests.exceptions.HTTPError(f"Index '{documents_list['index'].iloc[0]}' does not exist")
# Insert each document into opensearch index(es)
for document in documents_list.to_dict(orient='records'):
index_name = document.pop('index', None)
if not index_name:
raise ValueError("Document must have an 'index' field")
os_client.index(index=index_name,
body=document)

View file

@ -0,0 +1,16 @@
import pandas as pd
import requests
import utils.config as config
API_URL = config.API_URL
def get_idreseausocial(nom, endpoint=f"{API_URL}/reseauxsociaux/"):
reseaux_sociaux = pd.DataFrame(requests.get(endpoint).json())
return list(reseaux_sociaux[reseaux_sociaux["nom"] == nom]["id"])[0]
def get_idtypedocument(nom, endpoint=f"{API_URL}/typedocuments/"):
type_documents = pd.DataFrame(requests.get(endpoint).json())
return list(type_documents[type_documents["nom"] == nom]["id"])[0]

View file

@ -0,0 +1,22 @@
import os
import dotenv
# Load environment variables from.env file
dotenv.load_dotenv()
# Connect to OpenSearch using the provided credentials and hostname/port.
from opensearchpy import OpenSearch
host = 'localhost'
port = 9200
auth = ('admin', os.getenv("OPENSEARCH_INITIAL_ADMIN_PASSWORD")) # For testing only. Don't store credentials in code.
# Create the client with SSL/TLS enabled, but hostname verification disabled.
opensearch_client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_compress=True, # enables gzip compression for request bodies
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False
)

View file

@ -0,0 +1,14 @@
import utils.config as config
wordpress_names = config.WORDPRESS_NAMES.split(",")
reseau_social_data = [{"nom": "LinkedIn",
"repertoires": ["comments", "shares"]},
{"nom": "Wordpress",
"repertoires": wordpress_names},
{"nom": "Instagram",
"repertoires": ["comments", "content", "threads"]},
{"nom": "Facebook",
"repertoires": ["comments_and_reactions", "posts"]},
{"nom": "FacebookBusiness",
"repertoires": ["posts"]}]