Refactoring de la section pour générer les contenus

This commit is contained in:
François Pelletier 2024-11-10 23:28:47 -05:00
parent 7fa9af2f46
commit cd4b937168
9 changed files with 413 additions and 290 deletions

View file

@ -40,6 +40,7 @@ COPY conf ./conf
COPY font ./font
COPY resources ./resources
COPY styles ./styles
COPY routers ./routers
COPY *.py .
COPY *.lua .

74
authentication.py Normal file
View file

@ -0,0 +1,74 @@
from datetime import timedelta, datetime, timezone
from typing import Annotated
import jwt
from fastapi import Depends, HTTPException
from passlib.exc import InvalidTokenError
from starlette import status
from config import SECRET_KEY, ALGORITHM, fake_users_db, pwd_context, oauth2_scheme
from models import UserInDB, User
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = User(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

21
config.py Normal file
View file

@ -0,0 +1,21 @@
import os
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
SECRET_KEY = os.getenv("SECRET_KEY")
USERNAME = os.getenv("USERNAME")
PASS_HASH = os.getenv("PASS_HASH")
ALGORITHM = "HS256"
fake_users_db = {
"francois": {
"username": f"{USERNAME}",
"hashed_password": f"{PASS_HASH}",
"disabled": False,
}
}
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

322
main.py
View file

@ -16,306 +16,62 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from datetime import datetime, timedelta, timezone
import logging
from typing import Annotated
import jwt
from fastapi import FastAPI, UploadFile, Depends, HTTPException
from fastapi.responses import FileResponse
import pypandoc
import json
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import sys
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
import os
import shutil
from fastapi.middleware.cors import CORSMiddleware
from routers import users, images, generer, format_styles
from responses import App
from passlib.exc import InvalidTokenError
from starlette import status
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler(sys.stdout)
]
)
from DocumentSpecs import DocumentSpecs
from FormatParameters import FormatParameters
from convert_pdf import convert_pdf
from convert_video import convert_video
from extract_emojis import replace_emojis
from list_dir import list_dir
from models import UserInDB, User, TokenData, Token
from responses import Styles, Formats, App
logger = logging.getLogger(__name__)
from passlib.context import CryptContext
SECRET_KEY = os.getenv("SECRET_KEY")
USERNAME = os.getenv("USERNAME")
PASS_HASH = os.getenv("PASS_HASH")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
fake_users_db = {
"francois": {
"username": f"{USERNAME}",
"hashed_password": f"{PASS_HASH}",
"disabled": False,
}
}
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token/")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user
app.include_router(users.router)
app.include_router(images.router, prefix="/images", tags=["images"])
app.include_router(generer.router, prefix="/generer", tags=["generer"])
app.include_router(format_styles.router, tags=["format_styles"])
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
logger.info(f"Response status: {response.status_code}")
return response
@app.get("/")
async def get_root():
logger.info("Root endpoint accessed")
app = App(app='fabriquedoc')
return app
@app.get("/styles/")
async def get_styles(current_user: Annotated[User, Depends(get_current_active_user)]):
styles = Styles(styles=list_dir("./styles"))
return styles
@app.get("/formats/{style}/")
async def get_formats(style: str, current_user: Annotated[User, Depends(get_current_active_user)]):
formats = Formats(formats=list_dir(f"./styles/{style}/"))
return formats
@app.get("/format_parameters/{style}/{format}/")
async def get_format_parameters(style: str, format: str, current_user: Annotated[User, Depends(get_current_active_user)]):
# open styles/format_parameters.json as a dictionary
with open(f"./styles/{style}/format_parameters.json", "r") as f:
format_data = json.load(f).get(format)
logging.log(logging.INFO, str(format_data))
# load data from format_data into the FormatParameters object
parameters = FormatParameters(**format_data)
return parameters
@app.get("/images/")
async def get_images(current_user: Annotated[User, Depends(get_current_active_user)]):
# list all files in resources/images
files = [f for f in os.listdir("./resources/images") if os.path.isfile(os.path.join("./resources/images", f))]
# sort the files
files.sort()
return {"images": files}
@app.get("/images/{nom_image}")
async def get_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]):
return FileResponse(f"./resources/images/{nom_image}")
@app.post("/images/")
async def ajouter_image(file: UploadFile, current_user: Annotated[User, Depends(get_current_active_user)]):
"""
Add an image to the images folder.
:param current_user:
:param file:
:return:
"""
image_path = f"{os.getcwd()}/resources/images/{file.filename}"
try:
contents = file.file.read()
with open(image_path, 'wb') as f:
f.write(contents)
except Exception as e:
return {"message": f"There was an error uploading the file: {e}"}
finally:
file.file.close()
return {"message": f"Successfully uploaded all files"}
@app.delete("/images/{nom_image}")
async def supprimer_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]):
"""
Delete an image from the images folder.
:param current_user:
:param nom_image:
:return:
"""
image_path = f"{os.getcwd()}/resources/images/{nom_image}"
try:
os.remove(image_path)
except Exception as e:
return {"message": f"There was an error deleting the file: {e}"}
finally:
return {"message": f"Successfully deleted {nom_image}"}
@app.post("/generer/")
async def generer(specs: DocumentSpecs, current_user: Annotated[User, Depends(get_current_active_user)]):
header_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/header.tex'
cover_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/cover.tex'
datef = datetime.now().strftime("%Y-%m-%d")
os.makedirs("out", exist_ok=True)
filters = ['latex-emoji.lua', 'centered.lua']
pdoc_args = [
f'--include-in-header={header_file}',
f'--include-after-body={cover_file}',
'--listings',
'--dpi=300',
f'--toc-depth={specs.tocdepth}',
f'--pdf-engine={specs.pdfengine}',
f'--resource-path={os.getcwd()}/resources/',
'-V', f'linkcolor={specs.linkcolor}',
'-V', f'fontsize={specs.fontsize}pt',
'-V', f'geometry:paperwidth={round(specs.paperwidth * specs.ratio / 100, -1) / 300}in',
'-V', f'geometry:paperheight={round(specs.paperheight * specs.ratio / 100, -1) / 300}in',
'-V', f'geometry:left={specs.margin / 300}in',
'-V', f'geometry:right={specs.margin / 300}in',
'-V', f'geometry:top={specs.vmargin / 300}in',
'-V', f'geometry:bottom={specs.vmargin / 300}in'
]
pdf_file_path = f"./out/{specs.style}-{specs.format}-{datef}-output.pdf"
images_path = f"./out/{specs.style}-{specs.format}-{datef}-images"
try:
logging.info("Dossier courant = " + os.getcwd())
text_to_convert = replace_emojis(specs.content)
pypandoc.convert_text(source=text_to_convert,
to='pdf',
format='markdown+implicit_figures+smart+emoji',
encoding='utf-8',
extra_args=pdoc_args,
filters=filters,
cworkdir=os.getcwd(),
outputfile=pdf_file_path
)
except RuntimeError as rerr:
logging.exception(rerr)
except OSError as oerr:
logging.exception(oerr)
if specs.extension in ["jpg", "mp4"]:
filename = os.path.join("out", os.path.splitext(os.path.basename(pdf_file_path))[0])
if not os.path.exists(images_path):
os.mkdir(images_path)
conversion_extension = specs.extension
output_extension = specs.extension
if specs.extension in ["mp4"]:
conversion_extension = "jpg"
try:
convert_pdf(pdf_file_path,
conversion_extension,
images_path,
resolution=300)
if specs.extension in ["jpg"]:
shutil.make_archive(base_name=filename,
format='zip',
root_dir=images_path)
output_extension = "zip"
shutil.rmtree(images_path)
if specs.extension in ["mp4"]:
output_extension = "mp4"
convert_video(images_path=images_path,
output_path=f"{filename}.{output_extension}",
width=specs.paperwidth,
height=specs.paperheight,
fps=specs.fps,
stilltime=specs.stilltime)
except Exception as e:
logging.exception(e)
return FileResponse(f"{filename}.{output_extension}")
elif specs.extension == "pdf":
return FileResponse(pdf_file_path)
else:
return 0
client = TestClient(app)
def test_getroot():
response = client.get("/")
assert response.status_code == 200
if __name__ == "__main__":
import uvicorn
logger.info("Starting the application")
uvicorn.run(app, host="0.0.0.0", port=8000)

View file

@ -1,19 +1,14 @@
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str

33
routers/format_styles.py Normal file
View file

@ -0,0 +1,33 @@
import json
import logging
from typing import Annotated
from fastapi import APIRouter, Depends
from FormatParameters import FormatParameters
from list_dir import list_dir
from authentication import get_current_active_user
from models import User
from responses import Styles, Formats
router = APIRouter()
@router.get("/styles/")
async def get_styles(current_user: Annotated[User, Depends(get_current_active_user)]):
styles = Styles(styles=list_dir("../styles"))
return styles
@router.get("/formats/{style}/")
async def get_formats(style: str, current_user: Annotated[User, Depends(get_current_active_user)]):
formats = Formats(formats=list_dir(f"./styles/{style}/"))
return formats
@router.get("/format_parameters/{style}/{format}/")
async def get_format_parameters(style: str, format: str, current_user: Annotated[User, Depends(get_current_active_user)]):
# open styles/format_parameters.json as a dictionary
with open(f"./styles/{style}/format_parameters.json", "r") as f:
format_data = json.load(f).get(format)
logging.info(str(format_data))
# load data from format_data into the FormatParameters object
parameters = FormatParameters(**format_data)
return parameters

150
routers/generer.py Normal file
View file

@ -0,0 +1,150 @@
import logging
import os
import shutil
import zipfile
from datetime import datetime
from typing import Annotated
import pypandoc
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from starlette.responses import FileResponse
from DocumentSpecs import DocumentSpecs
from convert_pdf import convert_pdf
from convert_video import convert_video
from extract_emojis import replace_emojis
from authentication import get_current_active_user
from models import User
router = APIRouter()
logger = logging.getLogger(__name__)
def cleanup_task(output_dir: str):
logger.info(f"Cleaning up temporary directory: {output_dir}")
shutil.rmtree(output_dir)
logger.info("Cleanup complete")
@router.post("/")
async def generer(specs: DocumentSpecs, background_tasks: BackgroundTasks, current_user: Annotated[User, Depends(get_current_active_user)]):
logger.info(f"Starting document generation for user: {current_user.username}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = f"{specs.style}-{specs.format}-{timestamp}"
output_dir = f"./out/{base_name}"
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Created output directory: {output_dir}")
header_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/header.tex'
cover_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/cover.tex'
logger.debug(f"Header file: {header_file}, Cover file: {cover_file}")
filters = ['latex-emoji.lua', 'centered.lua']
pdoc_args = [
f'--include-in-header={header_file}',
f'--include-after-body={cover_file}',
'--listings',
'--dpi=300',
f'--toc-depth={specs.tocdepth}',
f'--pdf-engine={specs.pdfengine}',
f'--resource-path={os.getcwd()}/resources/',
'-V', f'linkcolor={specs.linkcolor}',
'-V', f'fontsize={specs.fontsize}pt',
'-V', f'geometry:paperwidth={round(specs.paperwidth * specs.ratio / 100, -1) / 300}in',
'-V', f'geometry:paperheight={round(specs.paperheight * specs.ratio / 100, -1) / 300}in',
'-V', f'geometry:left={specs.margin / 300}in',
'-V', f'geometry:right={specs.margin / 300}in',
'-V', f'geometry:top={specs.vmargin / 300}in',
'-V', f'geometry:bottom={specs.vmargin / 300}in'
]
logger.debug(f"Pandoc arguments: {pdoc_args}")
pdf_file_path = f"{output_dir}/{base_name}.pdf"
markdown_file_path = f"{output_dir}/{base_name}.md"
latex_file_path = f"{output_dir}/{base_name}.tex"
images_path = f"{output_dir}/{base_name}_images"
video_file_path = f"{output_dir}/{base_name}.mp4"
try:
logger.info(f"Current working directory: {os.getcwd()}")
text_to_convert = replace_emojis(specs.content)
logger.debug("Emojis replaced in content")
# Save Markdown content
with open(markdown_file_path, 'w', encoding='utf-8') as md_file:
md_file.write(text_to_convert)
logger.info(f"Markdown file saved: {markdown_file_path}")
# Generate PDF and LaTeX
logger.info("Generating PDF...")
pypandoc.convert_text(source=text_to_convert,
to='pdf',
format='markdown+implicit_figures+smart+emoji',
encoding='utf-8',
extra_args=pdoc_args,
filters=filters,
cworkdir=os.getcwd(),
outputfile=pdf_file_path
)
logger.info(f"PDF generated: {pdf_file_path}")
logger.info("Generating LaTeX...")
pypandoc.convert_text(source=text_to_convert,
to='latex',
format='markdown+implicit_figures+smart+emoji',
encoding='utf-8',
extra_args=pdoc_args,
filters=filters,
cworkdir=os.getcwd(),
outputfile=latex_file_path
)
logger.info(f"LaTeX file generated: {latex_file_path}")
# Generate JPG images
os.makedirs(images_path, exist_ok=True)
logger.info(f"Converting PDF to JPG images in {images_path}")
convert_pdf(pdf_file_path, "jpg", images_path, resolution=300)
logger.info("JPG images generated")
# Generate MP4 video
logger.info("Generating MP4 video...")
convert_video(images_path=images_path,
output_path=video_file_path,
width=specs.paperwidth,
height=specs.paperheight,
fps=specs.fps,
stilltime=specs.stilltime)
logger.info(f"MP4 video generated: {video_file_path}")
# Create ZIP file
zip_file_path = f"{output_dir}/{base_name}.zip"
logger.info(f"Creating ZIP file: {zip_file_path}")
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.write(pdf_file_path, os.path.basename(pdf_file_path))
zipf.write(markdown_file_path, os.path.basename(markdown_file_path))
zipf.write(latex_file_path, os.path.basename(latex_file_path))
zipf.write(video_file_path, os.path.basename(video_file_path))
for root, _, files in os.walk(images_path):
for file in files:
zipf.write(os.path.join(root, file),
os.path.join(f"{base_name}_images", file))
logger.info("ZIP file created successfully")
logger.info(f"Returning FileResponse for {zip_file_path}")
return FileResponse(zip_file_path, filename=f"{base_name}.zip")
except Exception as e:
logger.exception(f"Error during document generation: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Schedule the cleanup task to run in the background after the response is sent
background_tasks.add_task(cleanup_task, output_dir)
@router.post("/cleanup/{base_name}")
async def cleanup(base_name: str, current_user: Annotated[User, Depends(get_current_active_user)]):
output_dir = f"./out/{base_name}"
if os.path.exists(output_dir):
cleanup_task(output_dir)
return {"message": f"Cleanup for {base_name} completed successfully"}
else:
raise HTTPException(status_code=404, detail=f"Directory for {base_name} not found")

58
routers/images.py Normal file
View file

@ -0,0 +1,58 @@
import os
from typing import Annotated
from fastapi import APIRouter, Depends, UploadFile
from starlette.responses import FileResponse
from authentication import get_current_active_user
from models import User
router = APIRouter()
@router.get("/")
async def get_images(current_user: Annotated[User, Depends(get_current_active_user)]):
# list all files in resources/images
files = [f for f in os.listdir("../resources/images") if os.path.isfile(os.path.join("../resources/images", f))]
# sort the files
files.sort()
return {"images": files}
@router.get("/{nom_image}")
async def get_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]):
return FileResponse(f"./resources/images/{nom_image}")
@router.post("/")
async def ajouter_image(file: UploadFile, current_user: Annotated[User, Depends(get_current_active_user)]):
"""
Add an image to the images folder.
:param current_user:
:param file:
:return:
"""
image_path = f"{os.getcwd()}/resources/images/{file.filename}"
try:
contents = file.file.read()
with open(image_path, 'wb') as f:
f.write(contents)
except Exception as e:
return {"message": f"There was an error uploading the file: {e}"}
finally:
file.file.close()
return {"message": f"Successfully uploaded all files"}
@router.delete("/{nom_image}")
async def supprimer_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]):
"""
Delete an image from the images folder.
:param current_user:
:param nom_image:
:return:
"""
image_path = f"{os.getcwd()}/resources/images/{nom_image}"
try:
os.remove(image_path)
except Exception as e:
return {"message": f"There was an error deleting the file: {e}"}
finally:
return {"message": f"Successfully deleted {nom_image}"}

35
routers/users.py Normal file
View file

@ -0,0 +1,35 @@
from datetime import timedelta
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from starlette import status
from authentication import authenticate_user, create_access_token, get_current_active_user
from config import ACCESS_TOKEN_EXPIRE_MINUTES, fake_users_db
from models import Token, User
router = APIRouter()
@router.post("/token/")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@router.get("/users/me")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
return current_user