🚀 Add feature: Using MinIO as file storage for conversion both as input and output

This commit is contained in:
François Pelletier 2025-05-14 18:44:28 -04:00
parent afdfe1dbac
commit 579a3fe379
26 changed files with 16204 additions and 133 deletions

View file

@ -1,6 +1,9 @@
MILVUS_HOST=
MILVUS_PORT=
BACKEND_URL=
BACKEND_MINIO_BUCKET_NAME=
BACKEND_MINIO_ALIAS=
MINIO_SECURE=
FLOWISE_PORT=
ETCD_AUTO_COMPACTION_MODE=
ETCD_AUTO_COMPACTION_RETENTION=

1
.gitignore vendored
View file

@ -164,3 +164,4 @@ cython_debug/
/.idea/
/volumes/
/backend/data/

View file

@ -7,11 +7,14 @@ WORKDIR /app
# Expose the port the app will run on
EXPOSE 8080
# Copy the current directory contents into the container
COPY . .
# Copy requirements.txt
COPY requirements.txt .
# Install dependencies (ensure you have a requirements.txt file)
RUN pip install --no-cache-dir -r requirements.txt
# Copy the current directory contents into the container
COPY . .
# Command to run the app using Uvicorn
CMD ["uvicorn", "main:app", "--reload", "--host", "0.0.0.0", "--port", "8080"]

View file

@ -1,4 +1,7 @@
import logging
import os
from minio import Minio
from app.models import AvailableSource, AvailableSourcesResponse
@ -78,3 +81,20 @@ available_sources = AvailableSourcesResponse(
),
],
)
minio_alias_url = os.environ.get(
"BACKEND_MINIO_ALIAS", "http://minio:9000"
)
minio_bucket_name = os.environ.get(
"BACKEND_MINIO_BUCKET_NAME", "backend-retro-contenu"
)
minio_access_key = os.environ.get("MINIO_ROOT_USER", "minioadmin")
minio_secret_key = os.environ.get("MINIO_ROOT_PASSWORD", "minioadmin")
minio_secure = os.environ.get("MINIO_SECURE") == "True"
minio_client = Minio(
endpoint=minio_alias_url,
access_key=minio_access_key,
secret_key=minio_secret_key,
secure=minio_secure
)

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_bluesky_car(file: UploadFile):
def convert_bluesky_car(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_export_txt(file: UploadFile):
def convert_export_txt(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_facebook_comments_json(file: UploadFile):
def convert_facebook_comments_json(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_facebook_posts_json(file: UploadFile):
def convert_facebook_posts_json(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_instagram_comments_json(file: UploadFile):
def convert_instagram_comments_json(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_instagram_posts_json(file: UploadFile):
def convert_instagram_posts_json(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_instagram_reels_json(file: UploadFile):
def convert_instagram_reels_json(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_instagram_reels_video(file: UploadFile):
def convert_instagram_reels_video(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_instagram_stories_image(file: UploadFile):
def convert_instagram_stories_image(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_instagram_stories_json(file: UploadFile):
def convert_instagram_stories_json(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_linkedin_comments_csv(file: UploadFile):
def convert_linkedin_comments_csv(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,106 @@
import datetime
from io import StringIO, BytesIO
from typing import Dict, Union
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_linkedin_shares_csv(file: UploadFile):
# Implement conversion logic here
logger.info(file.headers)
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
def convert_linkedin_shares_csv(content: Union[str, bytes]) -> Dict:
"""
Convert a LinkedIn shares CSV file from MinIO into a standardized format.
Args:
content: CSV content as string or bytes
Returns:
Dictionary with converted data
Raises:
ValueError: If conversion fails
"""
try:
# Handle content based on its type
logger.info("Preparing to read CSV content")
if isinstance(content, bytes):
# If content is bytes, convert to string
try:
content_str = content.decode('utf-8')
csv_file = StringIO(content_str)
logger.debug("Converted bytes content to string")
except UnicodeDecodeError:
# If UTF-8 decoding fails, use BytesIO
csv_file = BytesIO(content)
logger.debug("Using binary content with BytesIO")
elif isinstance(content, str):
# If content is already a string, use it directly
csv_file = StringIO(content)
logger.debug("Using string content with StringIO")
else:
raise TypeError(f"Unsupported content type: {type(content)}")
# Read CSV into DataFrame
raw_shares = pd.read_csv(csv_file)
logger.info(f"Successfully read CSV with {len(raw_shares)} rows")
# Add identification columns
logger.info(
"Adding identification columns: 'index', 'type', 'network'"
)
raw_shares = raw_shares.assign(
index="linkedin_shares", type="posts", network="LinkedIn"
)
# Convert date to timestamp
logger.info("Converting 'Date' column to timestamp")
raw_shares["creation_timestamp"] = raw_shares["Date"].apply(
lambda x: int(datetime.datetime.fromisoformat(x).timestamp())
)
del raw_shares["Date"]
logger.info("Date column converted and deleted")
# Rename columns
logger.info("Renaming columns to standard format")
raw_shares = raw_shares.rename(
columns={"ShareLink": "uri", "ShareCommentary": "texte"}
)
# Ensure 'texte' has string type
logger.info("Ensuring 'texte' column is of type string")
raw_shares["texte"] = raw_shares["texte"].astype(str)
# Fill missing values
logger.info("Filling missing values with empty strings")
raw_shares = raw_shares.fillna("")
# Remove duplicates
logger.info(
"Removing duplicates based on 'texte' and 'creation_timestamp'"
)
raw_shares = raw_shares.drop_duplicates(
subset=["texte", "creation_timestamp"]
)
# Remove empty rows
logger.info("Removing rows with empty 'texte'")
raw_shares = raw_shares[raw_shares["texte"].str.strip() != ""]
# Convert to dictionary and return
logger.info("Converting DataFrame to dictionary format")
result = raw_shares.to_dict(orient="records")
logger.info(
f"Conversion completed successfully with {len(result)} records")
return result
except pd.errors.EmptyDataError as e:
logger.error(f"CSV file is empty or malformed: {str(e)}")
raise ValueError(f"CSV file is empty or malformed: {str(e)}")
except KeyError as e:
logger.error(f"Missing expected column in CSV: {str(e)}")
raise ValueError(f"Missing expected column in CSV: {str(e)}")
except Exception as e:
logger.exception(f"Unexpected error during conversion: {str(e)}")
raise ValueError(f"Unexpected error during conversion: {str(e)}")

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_markdown_txt(file: UploadFile):
def convert_markdown_txt(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_youtube_shorts_video(file: UploadFile):
def convert_youtube_shorts_video(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,10 +1,10 @@
from app.config import logger
from app.models import ConversionResponse
from fastapi import UploadFile
def convert_youtube_video_video(file: UploadFile):
def convert_youtube_video_video(content):
# Implement conversion logic here
logger.info(file.headers)
logger.info(f"Starting conversion of {len(content)} bytes")
converted_data = {} # Example data
return ConversionResponse(converted_data=converted_data, status="success")
return ConversionResponse(converted_data=converted_data, status="success",
metadata={})

View file

@ -1,7 +1,6 @@
from typing import Annotated, Dict, List
from typing import Dict, List
from fastapi import Form, UploadFile
from pydantic import BaseModel, model_validator
from pydantic import BaseModel
class AnalysisRequest(BaseModel):
@ -24,30 +23,15 @@ class AvailableSourcesResponse(BaseModel):
class ConversionRequest(BaseModel):
source_name: Annotated[str, Form()]
source_format: Annotated[str, Form()]
file: UploadFile
def __init__(
self,
source_name: Annotated[str, Form()],
source_format: Annotated[str, Form()],
file: UploadFile,
):
super().__init__(
source_name=source_name, source_format=source_format, file=file
)
@model_validator(mode="after")
def validate(self):
if not self.source_format:
self.source_format = "txt"
return self
source_name: str
source_format: str
filename: str
class ConversionResponse(BaseModel):
converted_data: dict
status: str
metadata: dict
class ExportRequest(BaseModel):

View file

@ -1,4 +1,11 @@
from app.config import logger
import datetime
import json
import os
from typing import Any, Dict, Union
from fastapi import APIRouter, HTTPException
from app.config import logger, minio_bucket_name, minio_client
from app.convert.convert_bluesky_car import convert_bluesky_car
from app.convert.convert_export_txt import convert_export_txt
from app.convert.convert_facebook_comments_json import (
@ -33,56 +40,225 @@ from app.convert.convert_youtube_shorts_video import (
)
from app.convert.convert_youtube_video_video import convert_youtube_video_video
from app.models import ConversionRequest, ConversionResponse
from fastapi import APIRouter
convert_router = APIRouter(prefix="/convert", tags=["Convert"])
# Define a mapping of source names to converter functions
CONVERTERS = {
"linkedin_shares": {
"csv": convert_linkedin_shares_csv
},
"linkedin_comments": {
"csv": convert_linkedin_comments_csv
},
"facebook_posts": {
"json": convert_facebook_posts_json
},
"facebook_comments": {
"json": convert_facebook_comments_json
},
"instagram_posts": {
"json": convert_instagram_posts_json
},
"instagram_comments": {
"json": convert_instagram_comments_json
},
"instagram_stories": {
"json": convert_instagram_stories_json,
"image": convert_instagram_stories_image
},
"instagram_reels": {
"json": convert_instagram_reels_json,
"video": convert_instagram_reels_video
},
"bluesky": {
"car": convert_bluesky_car
},
"youtube_video": {
"video": convert_youtube_video_video
},
"youtube_shorts": {
"video": convert_youtube_shorts_video
},
"markdown": {
"txt": convert_markdown_txt
},
"export": {
"txt": convert_export_txt
}
}
def read_content_from_minio(request: ConversionRequest) -> Union[str, bytes]:
"""
Read content from MinIO storage based on the request filename.
Args:
request: The conversion request containing the filename
Returns:
The file content as string (for text files) or bytes (for binary files)
Raises:
HTTPException: If the file cannot be read or doesn't exist
"""
# Check if filename exists
if not request.filename:
logger.error("Filename is empty or invalid")
raise HTTPException(
status_code=400, detail="Filename is required"
)
# Read file from MinIO
try:
logger.info(
f"Reading file '{request.filename}' from MinIO bucket '{minio_bucket_name}'")
with minio_client.get_object(
bucket_name=minio_bucket_name, object_name=request.filename
) as response:
content_type = response.headers.get("content-type", "")
logger.debug(f"File content type: {content_type}")
if content_type.startswith("text/"):
# Read as text (UTF-8)
content = response.read().decode("utf-8")
logger.debug(f"Read {len(content)} characters from text file")
else:
# Read as binary
content = response.read()
logger.debug(f"Read {len(content)} bytes from binary file")
return content
except Exception as e:
error_msg = f"Error reading file '{request.filename}' from MinIO: {e!s}"
logger.error(error_msg)
raise HTTPException(
status_code=500, detail=error_msg
) from e
def save_to_minio(data: Dict[str, Any], source_name: str) -> str:
"""
Save converted data to MinIO as a JSON file.
Args:
data: The data to save
source_name: The source name to use in the filename
Returns:
The filename of the saved file
Raises:
HTTPException: If the file cannot be saved
"""
try:
# Generate a unique filename with timestamp
timestamp = datetime.datetime.now(tz=datetime.UTC).isoformat().replace(
":", "-")
tmp_filename = f"{source_name}_{timestamp}.json"
logger.info(f"Saving converted data to temporary file '{tmp_filename}'")
# Write to temporary file
with open(tmp_filename, "w") as f:
json.dump(data, f)
# Upload to MinIO
logger.info(
f"Uploading '{tmp_filename}' to MinIO bucket '{minio_bucket_name}'")
minio_client.fput_object(
bucket_name=minio_bucket_name,
object_name=tmp_filename,
file_path=tmp_filename
)
# Clean up temporary file
try:
os.remove(tmp_filename)
logger.debug(f"Removed temporary file '{tmp_filename}'")
except OSError as e:
logger.warning(
f"Failed to remove temporary file '{tmp_filename}': {e!s}")
return tmp_filename
except Exception as e:
error_msg = f"Error saving converted data to MinIO: {e!s}"
logger.error(error_msg)
raise HTTPException(
status_code=500, detail=error_msg
) from e
@convert_router.post("/", response_model=ConversionResponse)
def convert_data(request: ConversionRequest):
"""
Convert data from a source to normalized JSON
Convert data from a source to normalized JSON and store it in MinIO.
Args:
request: The conversion request containing source details
Returns:
A ConversionResponse with status and metadata
Raises:
HTTPException: If conversion fails or source is not supported
"""
converted_data = None
logger.info(f"Converting {request.source_name} data to normalized JSON")
try:
logger.info(
f"Processing conversion request for {request.source_name} in {request.source_format} format")
if request.source_name == "linkedin_shares":
converted_data = convert_linkedin_shares_csv(request.file)
elif request.source_name == "linkedin_comments":
converted_data = convert_linkedin_comments_csv(request.file)
elif request.source_name == "facebook_posts":
converted_data = convert_facebook_posts_json(request.file)
elif request.source_name == "facebook_comments":
converted_data = convert_facebook_comments_json(request.file)
elif request.source_name == "instagram_posts":
converted_data = convert_instagram_posts_json(request.file)
elif request.source_name == "instagram_comments":
converted_data = convert_instagram_comments_json(request.file)
elif request.source_name == "instagram_stories":
if request.source_format == "json":
converted_data = convert_instagram_stories_json(request.file)
elif request.source_format == "image":
converted_data = convert_instagram_stories_image(request.file)
elif request.source_name == "instagram_reels":
if request.source_format == "json":
converted_data = convert_instagram_reels_json(request.file)
elif request.source_format == "video":
converted_data = convert_instagram_reels_video(request.file)
elif request.source_name == "bluesky":
converted_data = convert_bluesky_car(request.file)
elif request.source_name == "youtube_video":
converted_data = convert_youtube_video_video(request.file)
elif request.source_name == "youtube_shorts":
converted_data = convert_youtube_shorts_video(request.file)
elif request.source_name == "markdown":
converted_data = convert_markdown_txt(request.file)
elif request.source_name == "export":
converted_data = convert_export_txt(request.file)
else:
value_error_message = f"Unsupported source name: {request.source_name}"
raise ValueError(value_error_message)
# Read content from MinIO
content = read_content_from_minio(request)
return {
"converted_data": converted_data,
"status": "success",
}
# Check if source and format are supported
if request.source_name not in CONVERTERS:
error_msg = f"Unsupported source name: {request.source_name}"
logger.error(error_msg)
raise HTTPException(status_code=400, detail=error_msg)
if request.source_format not in CONVERTERS[request.source_name]:
error_msg = f"Unsupported format '{request.source_format}' for source '{request.source_name}'"
logger.error(error_msg)
raise HTTPException(status_code=400, detail=error_msg)
# Get the appropriate converter function
converter = CONVERTERS[request.source_name][request.source_format]
# Convert the content
logger.info(
f"Converting {request.source_name} data using {converter.__name__}")
try:
converted_data = converter(content)
logger.info(
f"Successfully converted data with {len(converted_data)} records")
except Exception as e:
error_msg = f"Error during conversion: {e!s}"
logger.error(error_msg, exc_info=True)
raise HTTPException(status_code=500, detail=error_msg) from e
# Save the converted data to MinIO
saved_filename = save_to_minio(converted_data, request.source_name)
# Return success response
return ConversionResponse(
converted_data={}, # Empty dict as per original implementation
status="ok",
metadata={
"source": request.source_name,
"format": request.source_format,
"records_count": len(converted_data) if isinstance(
converted_data, list) else 1,
"saved_filename": saved_filename
}
)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
# Catch any other exceptions
error_msg = f"Unexpected error during conversion process: {e!s}"
logger.exception(error_msg)
raise HTTPException(status_code=500, detail=error_msg) from e

View file

@ -1,3 +1,6 @@
fastapi
uvicorn
pydantic
pydantic
pytest
pandas
minio

View file

@ -8,6 +8,9 @@ services:
environment:
- MILVUS_HOST=${MILVUS_HOST}
- MILVUS_PORT=${MILVUS_PORT}
- BACKEND_MINIO_BUCKET_NAME=${BACKEND_MINIO_BUCKET_NAME}
- BACKEND_MINIO_ALIAS=${BACKEND_MINIO_ALIAS}
- MINIO_SECURE=${MINIO_SECURE}
depends_on:
- "milvus"
frontend:

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,23 @@
#!/bin/bash
# Configuration
MINIO_ALIAS="minio"
BUCKET_NAME="systeme-retro-testing"
FILE_PATH="linkedin_shares.csv"
# Check if the file exists
if [ ! -f "$FILE_PATH" ]; then
echo "Error: File $FILE_PATH not found."
exit 1
fi
# Create the bucket if it doesn't exist
mc alias set $MINIO_ALIAS http://localhost:9000
mc mb "$MINIO_ALIAS/$BUCKET_NAME" || true
# Upload the file to the bucket
mc cp "$FILE_PATH" "$MINIO_ALIAS/$BUCKET_NAME/"
# Optional: Verify the upload
echo "Upload completed."
mc ls "$MINIO_ALIAS/$BUCKET_NAME"

View file

@ -1 +1,7 @@
ruff
ruff
fastapi~=0.115.12
pandas~=2.2.3
pydantic~=2.11.4
python-dotenv~=1.1.0
requests~=2.32.3
streamlit~=1.45.0