""" Fabrique à documents Copyright (C) 2023 François Pelletier This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . """ from datetime import datetime, timedelta, timezone import logging from typing import Annotated import jwt from fastapi import FastAPI, UploadFile, Depends, HTTPException from fastapi.responses import FileResponse import pypandoc import json from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.testclient import TestClient import os import shutil from passlib.exc import InvalidTokenError from starlette import status from DocumentSpecs import DocumentSpecs from FormatParameters import FormatParameters from convert_pdf import convert_pdf from convert_video import convert_video from extract_emojis import replace_emojis from list_dir import list_dir from models import UserInDB, User, TokenData, Token from responses import Styles, Formats, App from passlib.context import CryptContext SECRET_KEY = os.getenv("SECRET_KEY") USERNAME = os.getenv("USERNAME") PASS_HASH = os.getenv("PASS_HASH") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days fake_users_db = { "francois": { "username": f"{USERNAME}", "hashed_password": f"{PASS_HASH}", "disabled": False, } } pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") app = FastAPI() def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except InvalidTokenError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: Annotated[User, Depends(get_current_user)], ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user @app.post("/token") async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], ) -> Token: user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") @app.get("/users/me") async def read_users_me( current_user: Annotated[User, Depends(get_current_active_user)], ): return current_user @app.get("/") async def get_root(): app = App(app='fabriquedoc') return app @app.get("/styles/") async def get_styles(current_user: Annotated[User, Depends(get_current_active_user)]): styles = Styles(styles=list_dir("./styles")) return styles @app.get("/formats/{style}/") async def get_formats(style: str, current_user: Annotated[User, Depends(get_current_active_user)]): formats = Formats(formats=list_dir(f"./styles/{style}/")) return formats @app.get("/format_parameters/{style}/{format}/") async def get_format_parameters(style: str, format: str, current_user: Annotated[User, Depends(get_current_active_user)]): # open styles/format_parameters.json as a dictionary with open(f"./styles/{style}/format_parameters.json", "r") as f: format_data = json.load(f).get(format) logging.log(logging.INFO, str(format_data)) # load data from format_data into the FormatParameters object parameters = FormatParameters(**format_data) return parameters @app.get("/images/") async def get_images(current_user: Annotated[User, Depends(get_current_active_user)]): # list all files in resources/images files = [f for f in os.listdir("./resources/images") if os.path.isfile(os.path.join("./resources/images", f))] # sort the files files.sort() return {"images": files} @app.get("/images/{nom_image}") async def get_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]): return FileResponse(f"./resources/images/{nom_image}") @app.post("/images/") async def ajouter_image(file: UploadFile, current_user: Annotated[User, Depends(get_current_active_user)]): """ Add an image to the images folder. :param file: :return: """ image_path = f"{os.getcwd()}/resources/images/{file.filename}" try: contents = file.file.read() with open(image_path, 'wb') as f: f.write(contents) except Exception as e: return {"message": f"There was an error uploading the file: {e}"} finally: file.file.close() return {"message": f"Successfully uploaded all files"} @app.delete("/images/{nom_image}") async def supprimer_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]): """ Delete an image from the images folder. :param nom_image: :return: """ image_path = f"{os.getcwd()}/resources/images/{nom_image}" try: os.remove(image_path) except Exception as e: return {"message": f"There was an error deleting the file: {e}"} finally: return {"message": f"Successfully deleted {nom_image}"} @app.post("/generer/") async def generer(specs: DocumentSpecs, current_user: Annotated[User, Depends(get_current_active_user)]): header_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/header.tex' cover_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/cover.tex' datef = datetime.now().strftime("%Y-%m-%d") os.makedirs("out", exist_ok=True) filters = ['latex-emoji.lua', 'centered.lua'] pdoc_args = [ f'--include-in-header={header_file}', f'--include-after-body={cover_file}', '--listings', '--dpi=300', f'--toc-depth={specs.tocdepth}', f'--pdf-engine={specs.pdfengine}', f'--resource-path={os.getcwd()}/resources/', '-V', f'linkcolor={specs.linkcolor}', '-V', f'fontsize={specs.fontsize}pt', '-V', f'geometry:paperwidth={round(specs.paperwidth * specs.ratio / 100, -1) / 300}in', '-V', f'geometry:paperheight={round(specs.paperheight * specs.ratio / 100, -1) / 300}in', '-V', f'geometry:left={specs.margin / 300}in', '-V', f'geometry:right={specs.margin / 300}in', '-V', f'geometry:top={specs.vmargin / 300}in', '-V', f'geometry:bottom={specs.vmargin / 300}in' ] pdf_file_path = f"./out/{specs.style}-{specs.format}-{datef}-output.pdf" images_path = f"./out/{specs.style}-{specs.format}-{datef}-images" try: logging.info("Dossier courant = " + os.getcwd()) text_to_convert = replace_emojis(specs.content) pypandoc.convert_text(source=text_to_convert, to='pdf', format='markdown+implicit_figures+smart+emoji', encoding='utf-8', extra_args=pdoc_args, filters=filters, cworkdir=os.getcwd(), outputfile=pdf_file_path ) except RuntimeError as rerr: logging.exception(rerr) except OSError as oerr: logging.exception(oerr) if specs.extension in ["jpg", "mp4"]: filename = os.path.join("out", os.path.splitext(os.path.basename(pdf_file_path))[0]) if not os.path.exists(images_path): os.mkdir(images_path) conversion_extension = specs.extension output_extension = specs.extension if specs.extension in ["mp4"]: conversion_extension = "jpg" try: convert_pdf(pdf_file_path, conversion_extension, images_path, resolution=300) if specs.extension in ["jpg"]: shutil.make_archive(base_name=filename, format='zip', root_dir=images_path) output_extension = "zip" shutil.rmtree(images_path) if specs.extension in ["mp4"]: output_extension = "mp4" convert_video(images_path=images_path, output_path=f"{filename}.{output_extension}", width=specs.paperwidth, height=specs.paperheight, fps=specs.fps, stilltime=specs.stilltime) except Exception as e: logging.exception(e) return FileResponse(f"{filename}.{output_extension}") elif specs.extension == "pdf": return FileResponse(pdf_file_path) else: return 0 client = TestClient(app) def test_getroot(): response = client.get("/") assert response.status_code == 200