🚀 Add feature: Charging in Milvus is now working

This commit is contained in:
François Pelletier 2025-05-21 17:21:57 -04:00
parent f9e5a6e013
commit 64832e2989
23 changed files with 354 additions and 109 deletions

View file

@ -11,4 +11,6 @@ ETCD_QUOTA_BACKEND_BYTES=
ETCD_SNAPSHOT_COUNT=
MINIO_ROOT_USER=
MINIO_ROOT_PASSWORD=
ATTU_HOST_URL=
ATTU_HOST_URL=
OLLAMA_URL=
OLLAMA_EMBEDDING_MODEL_NAME=

View file

@ -1,5 +1,6 @@
import logging
import os
import sys
import dotenv
from minio import Minio
@ -8,6 +9,13 @@ from app.models import AvailableSource, AvailableSourcesResponse
dotenv.load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
stream=sys.stdout
)
logger = logging.getLogger("base_logger")
available_sources = AvailableSourcesResponse(
@ -99,3 +107,8 @@ minio_client = Minio(
secret_key=minio_secret_key,
secure=minio_secure
)
ollama_url = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434")
embedding_model_name = os.environ.get("OLLAMA_EMBEDDING_MODEL_NAME",
"snowflake-arctic-embed2")

View file

@ -16,9 +16,9 @@ class FacebookBusinessPostsConverter(BaseConverter):
posts_medias = []
for post in self.datadict:
data_post_items = post['data']
texte_post_list = [item['post'] for item in data_post_items if
content_post_list = [item['post'] for item in data_post_items if
item.get('post')]
texte = "\n".join(texte_post_list)
content = "\n".join(content_post_list)
for attachment in post['attachments']:
if attachment.get('data'):
@ -27,7 +27,7 @@ class FacebookBusinessPostsConverter(BaseConverter):
media = data_item['media']
posts_medias.append({
"chemin": [media["uri"]],
"texte": texte,
"content": content,
"creation_timestamp": media[
"creation_timestamp"]
})

View file

@ -4,9 +4,8 @@ from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.content_from_file import content_from_file
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
@ -31,7 +30,7 @@ class FacebookCommentsConverter(BaseConverter):
if data_item.get('comment'):
comment_data = data_item['comment']
facebook_comments.append({
"texte": comment_data["comment"],
"content": comment_data["comment"],
"creation_timestamp": comment_data["timestamp"]
})

View file

@ -1,12 +1,11 @@
import json
from typing import Union, List, Dict
import pandas as pd
import json
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.content_from_file import content_from_file
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
@ -31,7 +30,7 @@ class FacebookPostsConverter(BaseConverter):
def rename_columns(self) -> None:
self.df.rename(columns={
"description": "texte",
"description": "content",
"uri": "chemin"
}, inplace=True)

View file

@ -4,10 +4,9 @@ from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.content_from_file import content_from_file
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
class InstagramCommentsConverter(BaseConverter):
@ -29,7 +28,7 @@ class InstagramCommentsConverter(BaseConverter):
[]) + self.datadict.get(
'post_comments_1', []):
ig_comments.append({
"texte": comment['string_map_data']['Comment']['value'],
"content": comment['string_map_data']['Comment']['value'],
'creation_timestamp': int(
comment['string_map_data']['Time']['timestamp']),
'index': self.df.index,

View file

@ -19,7 +19,7 @@ class InstagramPostsConverter(BaseConverter):
media = medias[0]
posts_medias.append({
"chemin": [media["uri"]],
"texte": media["title"],
"content": media["title"],
"creation_timestamp": media["creation_timestamp"]
})
else:
@ -28,7 +28,7 @@ class InstagramPostsConverter(BaseConverter):
list_uris = [media['uri'] for media in medias]
posts_medias.append({
"chemin": list_uris,
"texte": title,
"content": title,
"creation_timestamp": creation_timestamp
})

View file

@ -1,13 +1,12 @@
import json
import datetime
import json
from typing import Union, List, Dict
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.content_from_file import content_from_file
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
@ -34,7 +33,7 @@ class InstagramReelsConverter(BaseConverter):
def rename_columns(self) -> None:
self.df.rename(columns={
"title": "texte",
"title": "content",
"uri": "chemin"
}, inplace=True)

View file

@ -1,13 +1,12 @@
import json
import datetime
import json
from typing import Dict, Union, List
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.content_from_file import content_from_file
from app.convert.utils.convert_encoding_meta import convert_encoding_meta
@ -33,7 +32,7 @@ class InstagramStoriesConverter(BaseConverter):
def rename_columns(self) -> None:
self.df.rename(columns={
"title": "texte",
"title": "content",
"uri": "chemin"
}, inplace=True)

View file

@ -1,11 +1,11 @@
import pandas as pd
import datetime
from typing import Dict, Union, List
import pandas as pd
from app.config import logger
from app.models import ConversionResponse
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.content_from_file import content_from_file
class LinkedInCommentsConverter(BaseConverter):
@ -37,11 +37,11 @@ class LinkedInCommentsConverter(BaseConverter):
def rename_columns(self) -> None:
self.df.rename(columns={
"Link": "url",
"Message": "texte"
"Message": "content"
}, inplace=True)
def clean_data(self) -> None:
self.df["texte"] = self.df["texte"].apply(lambda x: str(x))
self.df["content"] = self.df["content"].apply(lambda x: str(x))
self.df["chemin"] = ""
self.df.fillna(value="", inplace=True)

View file

@ -3,8 +3,8 @@ from typing import Dict, Union, List
import pandas as pd
from app.convert.utils.content_from_file import content_from_file
from app.convert.base_converter import BaseConverter
from app.convert.utils.content_from_file import content_from_file
class LinkedInSharesConverter(BaseConverter):
@ -26,7 +26,7 @@ class LinkedInSharesConverter(BaseConverter):
def rename_columns(self) -> None:
self.df = self.df.rename(columns={
"ShareLink": "uri",
"ShareCommentary": "texte",
"ShareCommentary": "content",
"Date": "creation_timestamp"
})
@ -34,8 +34,8 @@ class LinkedInSharesConverter(BaseConverter):
"""Clean and preprocess the DataFrame."""
self.df = self.df.fillna("")
self.df = self.df.drop_duplicates(
subset=["texte", "creation_timestamp"])
self.df = self.df[self.df["texte"].str.strip() != ""]
subset=["content", "creation_timestamp"])
self.df = self.df[self.df["content"].str.strip() != ""]
def convert_linkedin_shares_csv(content: Union[str, bytes]) -> List[Dict]:

View file

@ -53,9 +53,15 @@ class GenerateResponse(BaseModel):
class ImportRequest(BaseModel):
type: str
data: str
source_type: str
object_name: str
class ImportResponse(BaseModel):
status: str
message: str
task_id: str = None
class AvailableCollectionsResponse(BaseModel):
collections: List[str]

View file

@ -1,7 +1,7 @@
import datetime
import json
import os
from typing import Dict, Union, List
from typing import Dict, List
from fastapi import APIRouter, HTTPException
@ -43,6 +43,7 @@ from app.convert.convert_youtube_shorts_video import (
)
from app.convert.convert_youtube_video_video import convert_youtube_video_video
from app.models import ConversionRequest, ConversionResponse
from app.routers.utils.read_content_from_minio import read_content_from_minio
convert_router = APIRouter(prefix="/convert", tags=["Convert"])
@ -127,53 +128,6 @@ def generate_temp_file(data: List[Dict], source_type: str) -> str:
return tmp_filename
def read_content_from_minio(request: ConversionRequest) -> Union[str, bytes]:
"""
Read content from MinIO storage based on the request filename.
Args:
request: The conversion request containing the filename
Returns:
The file content as string (for text files) or bytes (for binary files)
Raises:
HTTPException: If the file cannot be read or doesn't exist
"""
# Check if filename exists
if not request.filename:
logger.error("Filename is empty or invalid")
raise HTTPException(
status_code=400, detail="Filename is required"
)
# Read file from MinIO
try:
logger.info(
f"Reading file '{request.filename}' from MinIO bucket '{minio_bucket_name}'")
with minio_client.get_object(
bucket_name=minio_bucket_name, object_name=request.filename
) as response:
content_type = response.headers.get("content-type", "")
logger.debug(f"File content type: {content_type}")
if content_type.startswith("text/"):
# Read as text (UTF-8)
content = response.read().decode("utf-8")
logger.debug(f"Read {len(content)} characters from text file")
else:
# Read as binary
content = response.read()
logger.debug(f"Read {len(content)} bytes from binary file")
return content
except Exception as e:
error_msg = f"Error reading file '{request.filename}' from MinIO: {e!s}"
logger.error(error_msg)
raise HTTPException(
status_code=500, detail=error_msg
) from e
def save_to_minio(data: List[Dict], source_type: str) -> str:
"""
@ -197,7 +151,7 @@ def save_to_minio(data: List[Dict], source_type: str) -> str:
f"Uploading '{tmp_filename}' to MinIO bucket '{minio_bucket_name}'")
minio_client.fput_object(
bucket_name=minio_bucket_name,
object_name=tmp_filename,
object_name="output/" + tmp_filename,
file_path=tmp_filename
)
@ -238,7 +192,7 @@ def convert_data(request: ConversionRequest):
f"Processing conversion request for {request.source_type} in {request.source_format} format")
# Read content from MinIO
content = read_content_from_minio(request)
content = read_content_from_minio(request.filename)
# Check if source and format are supported
if request.source_type not in CONVERTERS:

View file

@ -1,26 +1,123 @@
from app.config import available_sources, logger
from app.models import AvailableSourcesResponse, ImportRequest, ImportResponse
from fastapi import APIRouter
import json
import traceback
from fastapi import APIRouter, HTTPException
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, \
DataType, utility
from app.config import logger
from app.models import ImportRequest, ImportResponse, \
AvailableCollectionsResponse
from app.routers.utils.generate_embeddings import generate_embeddings
from app.routers.utils.read_content_from_minio import read_content_from_minio
import_router = APIRouter(prefix="/import", tags=["Import"])
@import_router.post("/", response_model=ImportResponse)
def import_data(request: ImportRequest):
"""
Import data (e.g., text, files, or structured data).
"""
logger.info(f"Receiver importation request: {request.type}")
return ...
async def import_data(request: ImportRequest):
try:
logger.info(f"Starting import process for {request.source_type}")
# Check Milvus connection
try:
connections.connect("default", host="milvus", port="19530")
logger.info("Successfully connected to Milvus")
except Exception as e:
logger.error(f"Failed to connect to Milvus: {str(e)}")
return ImportResponse(status="error",
message="Failed to connect to Milvus")
# Fetch data from MinIO
try:
data = read_content_from_minio(request.object_name)
logger.info(
f"Successfully fetched data from MinIO: {request.object_name}")
except Exception as e:
logger.error(f"Failed to fetch data from MinIO: {str(e)}")
return ImportResponse(status="error",
message="Failed to fetch data from MinIO")
# Process data
processed_data = json.loads(data)
logger.info("Data processed successfully")
# Generate embeddings and insert into Milvus
collection_name = f"{request.source_type}_collection"
if not utility.has_collection(collection_name):
create_collection(collection_name)
collection = Collection(collection_name)
total_items = len(processed_data)
for i, item in enumerate(processed_data, 1):
try:
item["embedding"] = generate_embeddings(item)
filtered_item = {
"content": item.get("content", ""),
"embedding": item["embedding"],
"creation_timestamp": int(
item.get("creation_timestamp", 0)),
"index": item.get("index", ""),
"type": item.get("type", ""),
"network": item.get("network", ""),
"url": item.get("url", "")
}
_ = collection.insert([filtered_item])
logger.info(
f"Inserted item {i}/{total_items} into Milvus collection {collection_name}")
except Exception as e:
logger.error(f"Failed to process item {i}: {str(e)}")
logger.info(f"Import completed for {request.source_type}")
return ImportResponse(status="success",
message="Import completed successfully")
except Exception as e:
logger.error(f"Unexpected error during import: {str(e)}")
logger.error(traceback.format_exc())
return ImportResponse(status="error",
message=f"Unexpected error: {str(e)}")
@import_router.get(
"/available_sources", response_model=AvailableSourcesResponse
)
def get_available_sources():
"""
Get available sources from database
:return: Available sources in an AvailableSourcesResponse object
"""
logger.info("Get available sources from database")
return available_sources
def create_collection(collection_name: str):
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True,
auto_id=True),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="creation_timestamp", dtype=DataType.INT64),
FieldSchema(name="index", dtype=DataType.VARCHAR, max_length=255),
FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=255),
FieldSchema(name="network", dtype=DataType.VARCHAR, max_length=255),
FieldSchema(name="url", dtype=DataType.VARCHAR, max_length=2083),
]
schema = CollectionSchema(fields, "A collection for storing embeddings")
collection = Collection(collection_name, schema)
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index("embedding", index_params)
logger.info(f"Created new collection: {collection_name}")
@import_router.get("/available_collections",
response_model=AvailableCollectionsResponse)
def get_available_collections():
logger.info("Getting available collections from Milvus")
try:
if not utility.has_collection("default"):
connections.connect("default", host="milvus", port="19530")
collections = utility.list_collections()
logger.info(f"Found {len(collections)} collections")
return AvailableCollectionsResponse(collections=collections)
except Exception as e:
logger.error(f"Error getting collections from Milvus: {str(e)}")
raise HTTPException(status_code=500,
detail=f"Error getting collections from Milvus: {str(e)}")

View file

View file

@ -0,0 +1,43 @@
import json
import requests
from app.config import ollama_url, embedding_model_name, logger
def generate_embeddings(content):
# Convert content to string if it's not already
if not isinstance(content, str):
try:
content = json.dumps(content)
except Exception as e:
logger.error(
f"Error converting content to string: {str(e)}. Defaulting to string.")
content = str(content)
logger.info(
f"Generating embeddings for content: {content[:100]}...") # Log first 100 chars
try:
response = requests.post(f"{ollama_url}/api/embed", json={
"model": embedding_model_name,
"input": content
})
response.raise_for_status() # Raise an exception for bad status codes
embeddings = response.json().get('embeddings')[0]
if embeddings:
logger.info(
f"Successfully generated embeddings of length {len(embeddings)}")
return embeddings
else:
raise ValueError("No embeddings found in response")
except requests.RequestException as e:
logger.error(f"Error making request to Ollama API: {str(e)}")
logger.error(
f"Response content: {e.response.text if e.response else 'No response'}")
raise
except json.JSONDecodeError:
logger.error(f"Error decoding JSON response: {e.response.text}")
raise
except Exception as e:
logger.error(f"Unexpected error generating embeddings: {str(e)}")
raise

View file

@ -0,0 +1,54 @@
from typing import Union
from fastapi import HTTPException
from app.config import logger, minio_bucket_name, minio_client
def read_content_from_minio(filename: str) -> Union[str, bytes]:
"""
Read content from MinIO storage based on the request filename.
Args:
request: The conversion request containing the filename
Returns:
The file content as string (for text files) or bytes (for binary files)
Raises:
HTTPException: If the file cannot be read or doesn't exist
:param filename:
"""
# Check if filename exists
if not filename:
logger.error("Filename is empty or invalid")
raise HTTPException(
status_code=400, detail="Filename is required"
)
# Read file from MinIO
try:
logger.info(
f"Reading file '{filename}' from MinIO bucket '{minio_bucket_name}'")
with minio_client.get_object(
bucket_name=minio_bucket_name, object_name=filename
) as response:
content_type = response.headers.get("content-type", "")
logger.debug(f"File content type: {content_type}")
if content_type.startswith("text/"):
# Read as text (UTF-8)
content = response.read().decode("utf-8")
logger.debug(f"Read {len(content)} characters from text file")
else:
# Read as binary
content = response.read()
logger.debug(f"Read {len(content)} bytes from binary file")
return content
except Exception as e:
error_msg = f"Error reading file '{filename}' from MinIO: {e!s}"
logger.error(error_msg)
raise HTTPException(
status_code=500, detail=error_msg
) from e

View file

@ -7,4 +7,7 @@ minio
python-dotenv
xmltodict
markdownify
chardet
chardet
pymilvus
requests
tqdm

View file

@ -13,6 +13,11 @@ services:
- MINIO_SECURE=${MINIO_SECURE}
depends_on:
- "milvus"
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "5"
frontend:
build:
context: ./frontend

49
import_requests_test.http Normal file
View file

@ -0,0 +1,49 @@
@baseUrl = http://localhost:8080
### Import LinkedIn Comments
POST {{baseUrl}}/import
Content-Type: application/json
{
"source_type": "linkedin_comments",
"object_name": "output/linkedin_comments_2025-05-21T18-53-39.438179+00-00.json"
}
### Import LinkedIn Shares
POST {{baseUrl}}/import
Content-Type: application/json
{
"source_type": "linkedin_shares",
"object_name": "output/linkedin_shares_2025-05-21T18-53-39.700335+00-00.json"
}
### Import WordPress Posts
POST {{baseUrl}}/import
Content-Type: application/json
{
"source_type": "wordpress",
"object_name": "output/wordpress_2025-05-21T18-53-40.593642+00-00.json"
}
### Import Facebook Business Posts
POST {{baseUrl}}/import
Content-Type: application/json
{
"source_type": "facebook_business_posts",
"object_name": "output/facebook_business_posts_2025-05-21T18-53-40.643167+00-00.json"
}
### Import Instagram Posts
POST {{baseUrl}}/import
Content-Type: application/json
{
"source_type": "instagram_posts",
"object_name": "output/instagram_posts_2025-05-21T18-53-40.681279+00-00.json"
}
### Get Available Collections
GET {{baseUrl}}/import/available_collections

View file

@ -13,7 +13,7 @@ if [ ! -f "$FILE_PATH" ]; then
fi
# Create the bucket if it doesn't exist
mc alias set $MINIO_ALIAS http://localhost:9000
mc alias set $MINIO_ALIAS http://localhost:9000 "minioadmin" "minioadmin"
mc mb "$MINIO_ALIAS/$BUCKET_NAME" || true
# Upload the file to the bucket

View file

@ -7,4 +7,7 @@ requests~=2.32.3
streamlit~=1.45.0
xmltodict~=0.14.2
markdownify~=1.1.0
minio~=7.2.15
minio~=7.2.15
chardet~=5.2.0
pymilvus~=2.5.9
tqdm~=4.67.1

21
test_embedding.http Normal file
View file

@ -0,0 +1,21 @@
### List Available Models
GET http://localhost:11434/api/tags
Content-Type: application/json
### Test nomic-embed-text Embedding
POST http://localhost:11434/api/embed
Content-Type: application/json
{
"model": "nomic-embed-text:latest",
"input": "This is another test sentence to generate an embedding using a different model."
}
### Test snowflake-arctic-embed2 Embedding
POST http://localhost:11434/api/embed
Content-Type: application/json
{
"model": "snowflake-arctic-embed2:latest",
"input": "This is a test sentence to generate an embedding."
}