🚀 Add feature: Conversion scripts

🚀 Refactor: Object model for conversions
This commit is contained in:
François Pelletier 2025-05-19 18:15:50 -04:00
parent 579a3fe379
commit f3dec3b49a
29 changed files with 23968 additions and 199 deletions

View file

@ -1,10 +1,12 @@
import logging
import os
import dotenv
from minio import Minio
from app.models import AvailableSource, AvailableSourcesResponse
dotenv.load_dotenv()
logger = logging.getLogger("base_logger")
available_sources = AvailableSourcesResponse(
@ -82,9 +84,7 @@ available_sources = AvailableSourcesResponse(
],
)
minio_alias_url = os.environ.get(
"BACKEND_MINIO_ALIAS", "http://minio:9000"
)
minio_alias_url = os.environ.get("MINIO_ALIAS_URL", "minio:9000")
minio_bucket_name = os.environ.get(
"BACKEND_MINIO_BUCKET_NAME", "backend-retro-contenu"
)

View file

@ -0,0 +1,60 @@
from typing import Dict, Union, List
import pandas as pd
from app.config import logger
class BaseConverter:
def __init__(self, content: Union[str, bytes]):
self.content = content
# Placeholder for data not ready for conversion to a DataFrame format
self.datadict = None
self.df = None
def read_file(self) -> None:
"""Read the file content into a DataFrame."""
raise NotImplementedError(
"Subclasses must implement add_metadata method")
def add_metadata(self) -> None:
"""Add metadata columns to the DataFrame."""
raise NotImplementedError(
"Subclasses must implement add_metadata method")
def convert_columns(self) -> None:
"""Convert specific columns in the DataFrame."""
raise NotImplementedError(
"Subclasses must implement convert_columns method")
def rename_columns(self) -> None:
"""Rename columns in the DataFrame."""
raise NotImplementedError(
"Subclasses must implement rename_columns method")
def clean_data(self) -> None:
"""Clean and preprocess the DataFrame."""
raise NotImplementedError(
"Subclasses must implement rename_columns method")
def convert(self) -> List[Dict]:
"""Convert the content to the standardized format."""
try:
self.read_file()
self.add_metadata()
self.convert_columns()
self.rename_columns()
self.clean_data()
result = self.df.to_dict(orient="records")
logger.info(
f"Conversion completed successfully with {len(result)} records")
return result
except pd.errors.EmptyDataError as e:
logger.error(f"File is empty or malformed: {str(e)}")
raise ValueError(f"File is empty or malformed: {str(e)}")
except KeyError as e:
logger.error(f"Missing expected column: {str(e)}")
raise ValueError(f"Missing expected column: {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error during conversion: {str(e)}")
raise ValueError(f"Unexpected error during conversion: {str(e)}")

View file

@ -1,10 +1,34 @@
from app.config import logger
from app.models import ConversionResponse
import datetime
from typing import Dict, Union, List
from app.convert.base_converter import BaseConverter
def convert_bluesky_car(content):
# Implement conversion logic here
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
class BlueskyCarConverter(BaseConverter):
def read_file(self) -> None:
# Implement CAR file reading logic
pass
def add_metadata(self) -> None:
self.df = self.df.assign(index="bluesky_car", type="car",
network="Bluesky")
def convert_columns(self) -> None:
# Implement specific column conversions for Bluesky CAR files
pass
def rename_columns(self) -> None:
# Implement column renaming for Bluesky CAR files
pass
def clean_data(self) -> None:
# Add any Bluesky-specific data cleaning
pass
def convert_bluesky_car(content: Union[str, bytes]) -> List[Dict]:
"""
Convert Bluesky CAR content to a standardized format.
"""
converter = BlueskyCarConverter(content)
return converter.convert()

View file

@ -1,10 +1,56 @@
from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
def convert_export_txt(content):
# Implement conversion logic here
class ExportTxtConverter(BaseConverter):
def read_file(self) -> None:
txt_file = content_from_file(self.content)
# Assuming the txt file is structured in a way that can be read into a DataFrame
# You might need to adjust this depending on the actual structure of your export txt files
self.df = pd.read_csv(txt_file,
sep='\t') # or another appropriate method to read the txt file
def add_metadata(self) -> None:
self.df = self.df.assign(index="export_txt", type="export",
network="Generic")
def convert_columns(self) -> None:
# Implement specific column conversions for export txt files
# For example, converting timestamps, etc.
pass
def rename_columns(self) -> None:
# Implement column renaming for export txt files
# Map the original column names to the standardized names
column_mapping = {
# Add your column mappings here
# "original_column_name": "standardized_column_name",
}
self.df = self.df.rename(columns=column_mapping)
def clean_data(self) -> None:
# Add any export txt-specific data cleaning
# For example, removing any specific formatting, etc.
pass
def convert_export_txt(content: Union[str, bytes]) -> List[Dict]:
"""
Convert export txt content to a standardized format.
Args:
content (Union[str, bytes]): The txt content of the export.
Returns:
ConversionResponse: An object containing the converted data, status, and metadata.
"""
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = ExportTxtConverter(content)
return converter.convert()

View file

@ -0,0 +1,65 @@
import json
import datetime
from typing import Dict, Union, List
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
class FacebookBusinessPostsConverter(BaseConverter):
def read_file(self) -> None:
json_file = content_from_file(self.content)
content = convert_encoding_meta(json_file.read())
self.datadict = json.loads(content)
def add_metadata(self) -> None:
self.df = self.df.assign(
index="facebook_business_posts",
type="posts",
network="FacebookBusiness"
)
def convert_columns(self) -> None:
posts_medias = []
for post in self.datadict:
data_post_items = post['data']
texte_post_list = [item['post'] for item in data_post_items if
item.get('post')]
texte = "\n".join(texte_post_list)
for attachment in post['attachments']:
if attachment.get('data'):
for data_item in attachment['data']:
if data_item.get('media'):
media = data_item['media']
posts_medias.append({
"chemin": [media["uri"]],
"texte": texte,
"creation_timestamp": media[
"creation_timestamp"]
})
self.df = pd.DataFrame(posts_medias).explode(['chemin'])
self.df['creation_timestamp'] = self.df['creation_timestamp'].astype(
int)
def rename_columns(self) -> None:
# No column renaming needed for this converter
pass
def clean_data(self) -> None:
self.df['url'] = ""
self.df.fillna(value="", inplace=True)
def convert_facebook_business_posts_json(content: Union[str, bytes]) -> List[
Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converter = FacebookBusinessPostsConverter(content)
return converter.convert()

View file

@ -1,10 +1,56 @@
import json
from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
def convert_facebook_comments_json(content):
# Implement conversion logic here
class FacebookCommentsConverter(BaseConverter):
def read_file(self) -> None:
json_file = content_from_file(self.content)
content = convert_encoding_meta(json_file.read())
self.datadict = json.loads(content)
def add_metadata(self) -> None:
self.df = self.df.assign(
index="facebook_comments",
type="comments",
network="Facebook"
)
def convert_columns(self) -> None:
facebook_comments = []
for comment in self.datadict['comments_v2']:
if comment.get('data'):
for data_item in comment['data']:
if data_item.get('comment'):
comment_data = data_item['comment']
facebook_comments.append({
"texte": comment_data["comment"],
"creation_timestamp": comment_data["timestamp"]
})
self.df = pd.DataFrame(facebook_comments)
self.df['creation_timestamp'] = self.df['creation_timestamp'].astype(
int)
def rename_columns(self) -> None:
# No column renaming needed for this converter
pass
def clean_data(self) -> None:
self.df['url'] = ""
self.df['chemin'] = ""
self.df.fillna(value="", inplace=True)
def convert_facebook_comments_json(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = FacebookCommentsConverter(content)
return converter.convert()

View file

@ -1,10 +1,48 @@
from typing import Union, List, Dict
import pandas as pd
import json
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
def convert_facebook_posts_json(content):
# Implement conversion logic here
class FacebookPostsConverter(BaseConverter):
def read_file(self) -> None:
json_file = content_from_file(self.content)
content = convert_encoding_meta(json_file.read())
self.datadict = json.loads(content)
def add_metadata(self) -> None:
self.df = self.df.assign(
index="facebook_posts",
type="posts",
network="Facebook"
)
def convert_columns(self) -> None:
facebook_posts = self.datadict.get('other_photos_v2', [])
self.df = pd.DataFrame(facebook_posts)
self.df['creation_timestamp'] = self.df['creation_timestamp'].astype(
int)
def rename_columns(self) -> None:
self.df.rename(columns={
"description": "texte",
"uri": "chemin"
}, inplace=True)
def clean_data(self) -> None:
self.df['url'] = ""
self.df.drop(columns=['media_metadata'], errors='ignore', inplace=True)
self.df.fillna(value="", inplace=True)
def convert_facebook_posts_json(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = FacebookPostsConverter(content)
return converter.convert()

View file

@ -1,10 +1,55 @@
import json
from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
def convert_instagram_comments_json(content):
# Implement conversion logic here
class InstagramCommentsConverter(BaseConverter):
def read_file(self) -> None:
json_file = content_from_file(self.content)
content = convert_encoding_meta(json_file.read())
self.datadict = json.loads(content)
def add_metadata(self) -> None:
self.df = self.df.assign(
index="instagram_comments",
type="comments",
network="Instagram",
)
def convert_columns(self) -> None:
ig_comments = []
for comment in self.datadict.get('comments_reels_comments',
[]) + self.datadict.get(
'post_comments_1', []):
ig_comments.append({
"texte": comment['string_map_data']['Comment']['value'],
'creation_timestamp': int(
comment['string_map_data']['Time']['timestamp']),
'index': self.df.index,
'type': self.df.type,
'network': self.df.network,
'url': "",
'chemin': ""
})
self.df = pd.DataFrame(ig_comments)
def rename_columns(self) -> None:
# No need to rename columns as they are already in the desired format
pass
def clean_data(self) -> None:
self.df.fillna(value="", inplace=True)
def convert_instagram_comments_json(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = InstagramCommentsConverter(content)
return converter.convert()

View file

@ -1,10 +1,65 @@
import json
from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
def convert_instagram_posts_json(content):
# Implement conversion logic here
class InstagramPostsConverter(BaseConverter):
def read_file(self) -> None:
json_file = content_from_file(self.content)
content = convert_encoding_meta(json_file.read())
self.datadict = json.loads(content)
def add_metadata(self) -> None:
self.df = self.df.assign(
index="instagram_posts",
type="posts",
network="Instagram"
)
def convert_columns(self) -> None:
posts_medias = []
for post in self.datadict:
medias = post['media']
if len(medias) == 1:
media = medias[0]
posts_medias.append({
"chemin": [media["uri"]],
"texte": media["title"],
"creation_timestamp": media["creation_timestamp"]
})
else:
title = post['title']
creation_timestamp = post['creation_timestamp']
list_uris = [media['uri'] for media in medias]
posts_medias.append({
"chemin": list_uris,
"texte": title,
"creation_timestamp": creation_timestamp
})
self.df = pd.DataFrame(posts_medias).explode(['chemin'])
self.df['creation_timestamp'] = self.df['creation_timestamp'].astype(
int)
def rename_columns(self) -> None:
# No column renaming needed for this converter
pass
def clean_data(self) -> None:
super().clean_data()
self.df['url'] = ""
self.df.fillna(value="", inplace=True)
def convert_instagram_posts_json(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = InstagramPostsConverter(content)
return converter.convert()

View file

@ -1,10 +1,52 @@
import json
import datetime
from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
def convert_instagram_reels_json(content):
# Implement conversion logic here
class InstagramReelsConverter(BaseConverter):
def read_file(self) -> None:
json_file = content_from_file(self.content)
content = convert_encoding_meta(json_file.read())
self.datadict = json.loads(content)
def add_metadata(self) -> None:
self.df = self.df.assign(
index="instagram_reels",
type="reels",
network="Instagram"
)
def convert_columns(self) -> None:
reels_media = [x['media'][0] for x in
self.datadict.get('ig_reels_media', [])]
self.df = pd.DataFrame(reels_media)
self.df['creation_timestamp'] = self.df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat()
)
def rename_columns(self) -> None:
self.df.rename(columns={
"title": "texte",
"uri": "chemin"
}, inplace=True)
def clean_data(self) -> None:
self.df['url'] = ""
self.df.drop(columns=['creation_timestamp', 'media_metadata',
'cross_post_source'], inplace=True)
self.df.fillna(value="", inplace=True)
def convert_instagram_reels_json(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = InstagramReelsConverter(content)
return converter.convert()

View file

@ -1,10 +1,34 @@
from typing import Union, List, Dict
from app.config import logger
from app.models import ConversionResponse
from app.convert.base_converter import BaseConverter
def convert_instagram_reels_video(content):
# Implement conversion logic here
class InstagramReelsVideoConverter(BaseConverter):
def read_file(self) -> None:
# Implement video file reading logic
pass
def add_metadata(self) -> None:
self.df = self.df.assign(index="instagram_reels_video", type="video",
network="Instagram")
def convert_columns(self) -> None:
# Implement specific column conversions for Instagram reels video
pass
def rename_columns(self) -> None:
# Implement column renaming for Instagram reels video
pass
def clean_data(self) -> None:
# Add any Instagram reels video-specific data cleaning
pass
def convert_instagram_reels_video(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = InstagramReelsVideoConverter(content)
return converter.convert()

View file

@ -1,10 +1,34 @@
from typing import Union, List, Dict
from app.config import logger
from app.models import ConversionResponse
from app.convert.base_converter import BaseConverter
def convert_instagram_stories_image(content):
# Implement conversion logic here
class InstagramStoriesImageConverter(BaseConverter):
def read_file(self) -> None:
# Implement image file reading logic
pass
def add_metadata(self) -> None:
self.df = self.df.assign(index="instagram_stories_image", type="image",
network="Instagram")
def convert_columns(self) -> None:
# Implement specific column conversions for Instagram stories image
pass
def rename_columns(self) -> None:
# Implement column renaming for Instagram stories image
pass
def clean_data(self) -> None:
# Add any Instagram stories image-specific data cleaning
pass
def convert_instagram_stories_image(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = InstagramStoriesImageConverter(content)
return converter.convert()

View file

@ -1,10 +1,68 @@
import json
import datetime
from typing import Dict, Union, List
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
def convert_instagram_stories_json(content):
# Implement conversion logic here
class InstagramStoriesConverter(BaseConverter):
def read_file(self) -> None:
json_file = content_from_file(self.content)
content = convert_encoding_meta(json_file.read())
self.datadict = json.loads(content)
def add_metadata(self) -> None:
self.df = self.df.assign(
index="instagram_stories",
type="stories",
network="Instagram"
)
def convert_columns(self) -> None:
stories = self.datadict.get('ig_stories', [])
self.df = pd.DataFrame(stories)
self.df['creation_timestamp'] = self.df['creation_timestamp'].apply(
lambda x: datetime.datetime.fromtimestamp(x).isoformat()
)
def rename_columns(self) -> None:
self.df.rename(columns={
"title": "texte",
"uri": "chemin"
}, inplace=True)
def clean_data(self) -> None:
self.df['url'] = ""
self.df.drop(columns=['creation_timestamp', 'media_metadata',
'cross_post_source'], inplace=True)
self.df.fillna(value="", inplace=True)
def convert(self) -> List[Dict]:
try:
self.read_file()
self.convert_columns()
self.add_metadata()
self.rename_columns()
self.clean_data()
result = self.df.to_dict('records')
logger.info(f"Conversion completed successfully")
return result
except Exception as e:
logger.exception(f"Unexpected error during conversion: {str(e)}")
raise ValueError(f"Unexpected error during conversion: {str(e)}")
def convert_instagram_stories_json(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = InstagramStoriesConverter(content)
return converter.convert()

View file

@ -1,10 +1,69 @@
import pandas as pd
import datetime
from typing import Dict, Union, List
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
def convert_linkedin_comments_csv(content):
# Implement conversion logic here
class LinkedInCommentsConverter(BaseConverter):
def read_file(self) -> None:
csv_file = content_from_file(self.content)
self.df = pd.read_csv(csv_file,
escapechar='\\',
skipinitialspace=True)
# Output first 5 rows to logger
print(self.df.head(5))
def add_metadata(self) -> None:
self.df = self.df.assign(
index="linkedin_comments",
type="comments",
network="LinkedIn",
)
def convert_columns(self) -> None:
self.df['Message'] = (
self.df['Message'].str.replace(r'[\r\n\t]+', ' ', regex=True))
self.df = self.df[self.df['Message'] != ""]
self.df = self.df.drop_duplicates().reset_index(drop=True)
self.df["creation_timestamp"] = self.df["Date"].apply(
lambda x: int(datetime.datetime.fromisoformat(x).timestamp())
)
del self.df["Date"]
def rename_columns(self) -> None:
self.df.rename(columns={
"Link": "url",
"Message": "texte"
}, inplace=True)
def clean_data(self) -> None:
self.df["texte"] = self.df["texte"].apply(lambda x: str(x))
self.df["chemin"] = ""
self.df.fillna(value="", inplace=True)
def convert(self) -> List[Dict]:
try:
self.read_file()
self.add_metadata()
self.convert_columns()
self.rename_columns()
self.clean_data()
result = self.df.to_dict('records')
logger.info(f"Conversion completed successfully")
return result
except Exception as e:
logger.exception(f"Unexpected error during conversion: {str(e)}")
raise ValueError(f"Unexpected error during conversion: {str(e)}")
def convert_linkedin_comments_csv(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = LinkedInCommentsConverter(content)
return converter.convert()

View file

@ -1,106 +1,46 @@
import datetime
from io import StringIO, BytesIO
from typing import Dict, Union
from typing import Dict, Union, List
import pandas as pd
from app.config import logger
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
def convert_linkedin_shares_csv(content: Union[str, bytes]) -> Dict:
"""
Convert a LinkedIn shares CSV file from MinIO into a standardized format.
Args:
content: CSV content as string or bytes
Returns:
Dictionary with converted data
Raises:
ValueError: If conversion fails
"""
try:
# Handle content based on its type
logger.info("Preparing to read CSV content")
if isinstance(content, bytes):
# If content is bytes, convert to string
try:
content_str = content.decode('utf-8')
csv_file = StringIO(content_str)
logger.debug("Converted bytes content to string")
except UnicodeDecodeError:
# If UTF-8 decoding fails, use BytesIO
csv_file = BytesIO(content)
logger.debug("Using binary content with BytesIO")
elif isinstance(content, str):
# If content is already a string, use it directly
csv_file = StringIO(content)
logger.debug("Using string content with StringIO")
else:
raise TypeError(f"Unsupported content type: {type(content)}")
class LinkedInSharesConverter(BaseConverter):
def read_file(self) -> None:
"""Read the file content into a DataFrame."""
csv_file = content_from_file(self.content)
self.df = pd.read_csv(csv_file)
# Read CSV into DataFrame
raw_shares = pd.read_csv(csv_file)
logger.info(f"Successfully read CSV with {len(raw_shares)} rows")
def add_metadata(self) -> None:
self.df = self.df.assign(index="linkedin_shares", type="posts",
network="LinkedIn")
# Add identification columns
logger.info(
"Adding identification columns: 'index', 'type', 'network'"
)
raw_shares = raw_shares.assign(
index="linkedin_shares", type="posts", network="LinkedIn"
)
# Convert date to timestamp
logger.info("Converting 'Date' column to timestamp")
raw_shares["creation_timestamp"] = raw_shares["Date"].apply(
def convert_columns(self) -> None:
self.df["Date"] = self.df["Date"].apply(
lambda x: int(datetime.datetime.fromisoformat(x).timestamp())
)
del raw_shares["Date"]
logger.info("Date column converted and deleted")
self.df["ShareCommentary"] = self.df["ShareCommentary"].astype(str)
# Rename columns
logger.info("Renaming columns to standard format")
raw_shares = raw_shares.rename(
columns={"ShareLink": "uri", "ShareCommentary": "texte"}
)
def rename_columns(self) -> None:
self.df = self.df.rename(columns={
"ShareLink": "uri",
"ShareCommentary": "texte",
"Date": "creation_timestamp"
})
# Ensure 'texte' has string type
logger.info("Ensuring 'texte' column is of type string")
raw_shares["texte"] = raw_shares["texte"].astype(str)
def clean_data(self) -> None:
"""Clean and preprocess the DataFrame."""
self.df = self.df.fillna("")
self.df = self.df.drop_duplicates(
subset=["texte", "creation_timestamp"])
self.df = self.df[self.df["texte"].str.strip() != ""]
# Fill missing values
logger.info("Filling missing values with empty strings")
raw_shares = raw_shares.fillna("")
# Remove duplicates
logger.info(
"Removing duplicates based on 'texte' and 'creation_timestamp'"
)
raw_shares = raw_shares.drop_duplicates(
subset=["texte", "creation_timestamp"]
)
# Remove empty rows
logger.info("Removing rows with empty 'texte'")
raw_shares = raw_shares[raw_shares["texte"].str.strip() != ""]
# Convert to dictionary and return
logger.info("Converting DataFrame to dictionary format")
result = raw_shares.to_dict(orient="records")
logger.info(
f"Conversion completed successfully with {len(result)} records")
return result
except pd.errors.EmptyDataError as e:
logger.error(f"CSV file is empty or malformed: {str(e)}")
raise ValueError(f"CSV file is empty or malformed: {str(e)}")
except KeyError as e:
logger.error(f"Missing expected column in CSV: {str(e)}")
raise ValueError(f"Missing expected column in CSV: {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error during conversion: {str(e)}")
raise ValueError(f"Unexpected error during conversion: {str(e)}")
def convert_linkedin_shares_csv(content: Union[str, bytes]) -> List[Dict]:
"""
Convert LinkedIn shares CSV content to a standardized format.
"""
converter = LinkedInSharesConverter(content)
return converter.convert()

View file

@ -1,10 +1,83 @@
from typing import Dict, Union, List
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
def convert_markdown_txt(content):
# Implement conversion logic here
class MarkdownTxtConverter(BaseConverter):
def __init__(self, content: Union[str, bytes]):
super().__init__(content)
self.metadata = None
self.markdown_content = None
def read_file(self) -> None:
txt_file = content_from_file(self.content)
# For markdown, we might want to read it as a single string
# and then process it, rather than immediately converting to a DataFrame
self.markdown_content = txt_file.read()
def add_metadata(self) -> None:
# Since we're not using a DataFrame for markdown,
# we'll store metadata separately
self.metadata = {
"index": "markdown_txt",
"type": "markdown",
"network": "Generic"
}
def convert_columns(self) -> None:
# For markdown, we might not have columns in the traditional sense
# Instead, we could parse the markdown and extract structured data
# This is a placeholder for that logic
pass
def rename_columns(self) -> None:
# Again, for markdown, we might not have columns to rename
# This method could be used to standardize any extracted data
pass
def clean_data(self) -> None:
# Clean the markdown content
# For example, remove any unwanted characters or formatting
self.markdown_content = self.markdown_content.strip()
def convert(self) -> List[Dict]:
"""Convert the markdown content to the standardized format."""
try:
self.read_file()
self.add_metadata()
self.convert_columns()
self.rename_columns()
self.clean_data()
# Process the markdown content and convert it to the desired format
# This is a placeholder for the actual conversion logic
result = [{
"content": self.markdown_content,
**self.metadata
}]
logger.info(f"Conversion completed successfully")
return result
except Exception as e:
logger.exception(f"Unexpected error during conversion: {str(e)}")
raise ValueError(f"Unexpected error during conversion: {str(e)}")
def convert_markdown_txt(content: Union[str, bytes]) -> List[Dict]:
"""
Convert markdown txt content to a standardized format.
Args:
content (Union[str, bytes]): The txt content of the markdown.
Returns:
ConversionResponse: An object containing the converted data, status, and metadata.
"""
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = MarkdownTxtConverter(content)
return converter.convert()

View file

@ -1,10 +1,33 @@
from typing import Union, List, Dict
from app.config import logger
from app.models import ConversionResponse
from app.convert.base_converter import BaseConverter
def convert_youtube_shorts_video(content):
# Implement conversion logic here
class YouTubeShortsVideoConverter(BaseConverter):
def read_file(self) -> None:
# Implement video file reading logic
pass
def add_metadata(self) -> None:
self.df = self.df.assign(index="youtube_shorts_video", type="video",
network="YouTube")
def convert_columns(self) -> None:
# Implement specific column conversions for YouTube Shorts video
pass
def rename_columns(self) -> None:
# Implement column renaming for YouTube Shorts video
pass
def clean_data(self) -> None:
# Add any YouTube Shorts video-specific data cleaning
pass
def convert_youtube_shorts_video(content: Union[str, bytes]) -> List[Dict]:
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
converter = YouTubeShortsVideoConverter(content)
return converter.convert()

View file

@ -1,10 +1,32 @@
from app.config import logger
from app.models import ConversionResponse
from typing import Dict, Union, List
from app.convert.base_converter import BaseConverter
def convert_youtube_video_video(content):
# Implement conversion logic here
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})
class YouTubeVideoConverter(BaseConverter):
def read_file(self) -> None:
# Implement video file reading logic
pass
def add_metadata(self) -> None:
self.df = self.df.assign(index="youtube_video", type="video",
network="YouTube")
def convert_columns(self) -> None:
# Implement specific column conversions for YouTube videos
pass
def rename_columns(self) -> None:
# Implement column renaming for YouTube videos
pass
def clean_data(self) -> None:
# Add any YouTube-specific data cleaning
pass
def convert_youtube_video_video(content: Union[str, bytes]) -> List[Dict]:
"""
Convert YouTube video content to a standardized format.
"""
converter = YouTubeVideoConverter(content)
return converter.convert()

View file

View file

@ -0,0 +1,34 @@
from io import StringIO, BytesIO
from typing import Union
from app.config import logger
def content_from_file(content: Union[str, bytes]) -> Union[StringIO, BytesIO]:
"""
Prepare CSV content for reading, handling both string and bytes input.
Args:
content: CSV content as string or bytes
Returns:
StringIO or BytesIO object containing the CSV content
Raises:
TypeError: If content is neither string nor bytes
"""
logger.info("Preparing to read CSV content")
if isinstance(content, str):
logger.debug("Using string content with StringIO")
return StringIO(content)
if isinstance(content, bytes):
try:
logger.debug("Attempting to convert bytes content to string")
return StringIO(content.decode('utf-8'))
except UnicodeDecodeError:
logger.debug("Using binary content with BytesIO")
return BytesIO(content)
raise TypeError(f"Unsupported content type: {type(content)}")

View file

@ -0,0 +1,8 @@
import re
def convert_encoding_meta(text):
conv_text = re.sub(r'[\xc2-\xf4][\x80-\xbf]+',
lambda m: m.group(0).encode('latin1').decode('utf8'),
text)
return conv_text

View file

@ -23,13 +23,12 @@ class AvailableSourcesResponse(BaseModel):
class ConversionRequest(BaseModel):
source_name: str
source_type: str
source_format: str
filename: str
class ConversionResponse(BaseModel):
converted_data: dict
status: str
metadata: dict

View file

@ -1,13 +1,15 @@
import datetime
import json
import os
from typing import Any, Dict, Union
from typing import Any, Dict, Union, List
from fastapi import APIRouter, HTTPException
from app.config import logger, minio_bucket_name, minio_client
from app.convert.convert_bluesky_car import convert_bluesky_car
from app.convert.convert_export_txt import convert_export_txt
from app.convert.convert_facebook_business_posts_json import \
convert_facebook_business_posts_json
from app.convert.convert_facebook_comments_json import (
convert_facebook_comments_json,
)
@ -54,6 +56,9 @@ CONVERTERS = {
"facebook_posts": {
"json": convert_facebook_posts_json
},
"facebook_business_posts": {
"json": convert_facebook_business_posts_json
},
"facebook_comments": {
"json": convert_facebook_comments_json
},
@ -89,6 +94,35 @@ CONVERTERS = {
}
def generate_temp_file(data: List[Dict], source_type: str) -> str:
"""
Generate a unique filename and write data to a temporary file.
Args:
data: The data to save
source_type: The source name to use in the filename
Returns:
The filename of the temporary file
Raises:
IOError: If the file cannot be written
"""
timestamp = datetime.datetime.now(tz=datetime.UTC).isoformat().replace(":",
"-")
tmp_filename = f"{source_type}_{timestamp}.json"
logger.info(f"Saving converted data to temporary file '{tmp_filename}'")
try:
with open(tmp_filename, "w") as f:
f.write(json.dumps(data))
except IOError as e:
logger.error(f"Failed to write temporary file '{tmp_filename}': {e}")
raise
return tmp_filename
def read_content_from_minio(request: ConversionRequest) -> Union[str, bytes]:
"""
Read content from MinIO storage based on the request filename.
@ -137,13 +171,13 @@ def read_content_from_minio(request: ConversionRequest) -> Union[str, bytes]:
) from e
def save_to_minio(data: Dict[str, Any], source_name: str) -> str:
def save_to_minio(data: List[Dict], source_type: str) -> str:
"""
Save converted data to MinIO as a JSON file.
Args:
data: The data to save
source_name: The source name to use in the filename
source_type: The source name to use in the filename
Returns:
The filename of the saved file
@ -152,16 +186,7 @@ def save_to_minio(data: Dict[str, Any], source_name: str) -> str:
HTTPException: If the file cannot be saved
"""
try:
# Generate a unique filename with timestamp
timestamp = datetime.datetime.now(tz=datetime.UTC).isoformat().replace(
":", "-")
tmp_filename = f"{source_name}_{timestamp}.json"
logger.info(f"Saving converted data to temporary file '{tmp_filename}'")
# Write to temporary file
with open(tmp_filename, "w") as f:
json.dump(data, f)
tmp_filename = generate_temp_file(data, source_type)
# Upload to MinIO
logger.info(
@ -206,28 +231,28 @@ def convert_data(request: ConversionRequest):
"""
try:
logger.info(
f"Processing conversion request for {request.source_name} in {request.source_format} format")
f"Processing conversion request for {request.source_type} in {request.source_format} format")
# Read content from MinIO
content = read_content_from_minio(request)
# Check if source and format are supported
if request.source_name not in CONVERTERS:
error_msg = f"Unsupported source name: {request.source_name}"
if request.source_type not in CONVERTERS:
error_msg = f"Unsupported source name: {request.source_type}"
logger.error(error_msg)
raise HTTPException(status_code=400, detail=error_msg)
if request.source_format not in CONVERTERS[request.source_name]:
error_msg = f"Unsupported format '{request.source_format}' for source '{request.source_name}'"
if request.source_format not in CONVERTERS[request.source_type]:
error_msg = f"Unsupported format '{request.source_format}' for source '{request.source_type}'"
logger.error(error_msg)
raise HTTPException(status_code=400, detail=error_msg)
# Get the appropriate converter function
converter = CONVERTERS[request.source_name][request.source_format]
converter = CONVERTERS[request.source_type][request.source_format]
# Convert the content
logger.info(
f"Converting {request.source_name} data using {converter.__name__}")
f"Converting {request.source_type} data using {converter.__name__}")
try:
converted_data = converter(content)
logger.info(
@ -238,14 +263,13 @@ def convert_data(request: ConversionRequest):
raise HTTPException(status_code=500, detail=error_msg) from e
# Save the converted data to MinIO
saved_filename = save_to_minio(converted_data, request.source_name)
saved_filename = save_to_minio(converted_data, request.source_type)
# Return success response
return ConversionResponse(
converted_data={}, # Empty dict as per original implementation
status="ok",
metadata={
"source": request.source_name,
"source": request.source_type,
"format": request.source_format,
"records_count": len(converted_data) if isinstance(
converted_data, list) else 1,

View file

@ -3,4 +3,5 @@ uvicorn
pydantic
pytest
pandas
minio
minio
python-dotenv