321 lines
11 KiB
Python
321 lines
11 KiB
Python
"""
|
|
Fabrique à documents
|
|
Copyright (C) 2023 François Pelletier
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published
|
|
by the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
import logging
|
|
from typing import Annotated
|
|
|
|
import jwt
|
|
from fastapi import FastAPI, UploadFile, Depends, HTTPException
|
|
from fastapi.responses import FileResponse
|
|
import pypandoc
|
|
import json
|
|
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from fastapi.testclient import TestClient
|
|
import os
|
|
import shutil
|
|
|
|
from passlib.exc import InvalidTokenError
|
|
from starlette import status
|
|
|
|
from DocumentSpecs import DocumentSpecs
|
|
from FormatParameters import FormatParameters
|
|
from convert_pdf import convert_pdf
|
|
from convert_video import convert_video
|
|
from extract_emojis import replace_emojis
|
|
from list_dir import list_dir
|
|
from models import UserInDB, User, TokenData, Token
|
|
from responses import Styles, Formats, App
|
|
|
|
from passlib.context import CryptContext
|
|
|
|
SECRET_KEY = os.getenv("SECRET_KEY")
|
|
USERNAME = os.getenv("USERNAME")
|
|
PASS_HASH = os.getenv("PASS_HASH")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
|
|
|
|
fake_users_db = {
|
|
"francois": {
|
|
"username": f"{USERNAME}",
|
|
"hashed_password": f"{PASS_HASH}",
|
|
"disabled": False,
|
|
}
|
|
}
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
app = FastAPI()
|
|
|
|
|
|
def verify_password(plain_password, hashed_password):
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
def get_password_hash(password):
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def get_user(db, username: str):
|
|
if username in db:
|
|
user_dict = db[username]
|
|
return UserInDB(**user_dict)
|
|
|
|
|
|
def authenticate_user(fake_db, username: str, password: str):
|
|
user = get_user(fake_db, username)
|
|
if not user:
|
|
return False
|
|
if not verify_password(password, user.hashed_password):
|
|
return False
|
|
return user
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
else:
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
|
|
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
raise credentials_exception
|
|
token_data = TokenData(username=username)
|
|
except InvalidTokenError:
|
|
raise credentials_exception
|
|
user = get_user(fake_users_db, username=token_data.username)
|
|
if user is None:
|
|
raise credentials_exception
|
|
return user
|
|
|
|
|
|
async def get_current_active_user(
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
):
|
|
if current_user.disabled:
|
|
raise HTTPException(status_code=400, detail="Inactive user")
|
|
return current_user
|
|
|
|
|
|
@app.post("/token/")
|
|
async def login_for_access_token(
|
|
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
|
) -> Token:
|
|
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": user.username}, expires_delta=access_token_expires
|
|
)
|
|
return Token(access_token=access_token, token_type="bearer")
|
|
|
|
|
|
@app.get("/users/me")
|
|
async def read_users_me(
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
):
|
|
return current_user
|
|
|
|
|
|
@app.get("/")
|
|
async def get_root():
|
|
app = App(app='fabriquedoc')
|
|
return app
|
|
|
|
|
|
@app.get("/styles/")
|
|
async def get_styles(current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
styles = Styles(styles=list_dir("./styles"))
|
|
return styles
|
|
|
|
|
|
@app.get("/formats/{style}/")
|
|
async def get_formats(style: str, current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
formats = Formats(formats=list_dir(f"./styles/{style}/"))
|
|
return formats
|
|
|
|
|
|
@app.get("/format_parameters/{style}/{format}/")
|
|
async def get_format_parameters(style: str, format: str, current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
# open styles/format_parameters.json as a dictionary
|
|
with open(f"./styles/{style}/format_parameters.json", "r") as f:
|
|
format_data = json.load(f).get(format)
|
|
logging.log(logging.INFO, str(format_data))
|
|
# load data from format_data into the FormatParameters object
|
|
parameters = FormatParameters(**format_data)
|
|
return parameters
|
|
|
|
|
|
@app.get("/images/")
|
|
async def get_images(current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
# list all files in resources/images
|
|
files = [f for f in os.listdir("./resources/images") if os.path.isfile(os.path.join("./resources/images", f))]
|
|
# sort the files
|
|
files.sort()
|
|
return {"images": files}
|
|
|
|
|
|
@app.get("/images/{nom_image}")
|
|
async def get_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
return FileResponse(f"./resources/images/{nom_image}")
|
|
|
|
|
|
@app.post("/images/")
|
|
async def ajouter_image(file: UploadFile, current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
"""
|
|
Add an image to the images folder.
|
|
:param current_user:
|
|
:param file:
|
|
:return:
|
|
"""
|
|
image_path = f"{os.getcwd()}/resources/images/{file.filename}"
|
|
try:
|
|
contents = file.file.read()
|
|
with open(image_path, 'wb') as f:
|
|
f.write(contents)
|
|
except Exception as e:
|
|
return {"message": f"There was an error uploading the file: {e}"}
|
|
finally:
|
|
file.file.close()
|
|
|
|
return {"message": f"Successfully uploaded all files"}
|
|
|
|
|
|
@app.delete("/images/{nom_image}")
|
|
async def supprimer_image(nom_image: str, current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
"""
|
|
Delete an image from the images folder.
|
|
:param current_user:
|
|
:param nom_image:
|
|
:return:
|
|
"""
|
|
image_path = f"{os.getcwd()}/resources/images/{nom_image}"
|
|
try:
|
|
os.remove(image_path)
|
|
except Exception as e:
|
|
return {"message": f"There was an error deleting the file: {e}"}
|
|
finally:
|
|
return {"message": f"Successfully deleted {nom_image}"}
|
|
|
|
|
|
@app.post("/generer/")
|
|
async def generer(specs: DocumentSpecs, current_user: Annotated[User, Depends(get_current_active_user)]):
|
|
header_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/header.tex'
|
|
cover_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/cover.tex'
|
|
datef = datetime.now().strftime("%Y-%m-%d")
|
|
os.makedirs("out", exist_ok=True)
|
|
filters = ['latex-emoji.lua', 'centered.lua']
|
|
pdoc_args = [
|
|
f'--include-in-header={header_file}',
|
|
f'--include-after-body={cover_file}',
|
|
'--listings',
|
|
'--dpi=300',
|
|
f'--toc-depth={specs.tocdepth}',
|
|
f'--pdf-engine={specs.pdfengine}',
|
|
f'--resource-path={os.getcwd()}/resources/',
|
|
'-V', f'linkcolor={specs.linkcolor}',
|
|
'-V', f'fontsize={specs.fontsize}pt',
|
|
'-V', f'geometry:paperwidth={round(specs.paperwidth * specs.ratio / 100, -1) / 300}in',
|
|
'-V', f'geometry:paperheight={round(specs.paperheight * specs.ratio / 100, -1) / 300}in',
|
|
'-V', f'geometry:left={specs.margin / 300}in',
|
|
'-V', f'geometry:right={specs.margin / 300}in',
|
|
'-V', f'geometry:top={specs.vmargin / 300}in',
|
|
'-V', f'geometry:bottom={specs.vmargin / 300}in'
|
|
]
|
|
pdf_file_path = f"./out/{specs.style}-{specs.format}-{datef}-output.pdf"
|
|
images_path = f"./out/{specs.style}-{specs.format}-{datef}-images"
|
|
try:
|
|
logging.info("Dossier courant = " + os.getcwd())
|
|
|
|
text_to_convert = replace_emojis(specs.content)
|
|
|
|
pypandoc.convert_text(source=text_to_convert,
|
|
to='pdf',
|
|
format='markdown+implicit_figures+smart+emoji',
|
|
encoding='utf-8',
|
|
extra_args=pdoc_args,
|
|
filters=filters,
|
|
cworkdir=os.getcwd(),
|
|
outputfile=pdf_file_path
|
|
)
|
|
|
|
except RuntimeError as rerr:
|
|
logging.exception(rerr)
|
|
except OSError as oerr:
|
|
logging.exception(oerr)
|
|
if specs.extension in ["jpg", "mp4"]:
|
|
filename = os.path.join("out", os.path.splitext(os.path.basename(pdf_file_path))[0])
|
|
if not os.path.exists(images_path):
|
|
os.mkdir(images_path)
|
|
conversion_extension = specs.extension
|
|
output_extension = specs.extension
|
|
if specs.extension in ["mp4"]:
|
|
conversion_extension = "jpg"
|
|
try:
|
|
convert_pdf(pdf_file_path,
|
|
conversion_extension,
|
|
images_path,
|
|
resolution=300)
|
|
if specs.extension in ["jpg"]:
|
|
shutil.make_archive(base_name=filename,
|
|
format='zip',
|
|
root_dir=images_path)
|
|
output_extension = "zip"
|
|
shutil.rmtree(images_path)
|
|
if specs.extension in ["mp4"]:
|
|
output_extension = "mp4"
|
|
convert_video(images_path=images_path,
|
|
output_path=f"{filename}.{output_extension}",
|
|
width=specs.paperwidth,
|
|
height=specs.paperheight,
|
|
fps=specs.fps,
|
|
stilltime=specs.stilltime)
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
return FileResponse(f"{filename}.{output_extension}")
|
|
elif specs.extension == "pdf":
|
|
return FileResponse(pdf_file_path)
|
|
else:
|
|
return 0
|
|
|
|
|
|
client = TestClient(app)
|
|
|
|
|
|
def test_getroot():
|
|
response = client.get("/")
|
|
assert response.status_code == 200
|