159 lines
6.7 KiB
Python
159 lines
6.7 KiB
Python
|
import logging
|
||
|
import os
|
||
|
import shutil
|
||
|
import zipfile
|
||
|
from datetime import datetime
|
||
|
from typing import Annotated
|
||
|
|
||
|
import pypandoc
|
||
|
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
|
||
|
from starlette.responses import FileResponse
|
||
|
|
||
|
from DocumentSpecs import DocumentSpecs
|
||
|
from convert_pdf import convert_pdf
|
||
|
from convert_video import convert_video
|
||
|
from extract_emojis import replace_emojis
|
||
|
from authentication import get_current_active_user
|
||
|
from models import User
|
||
|
|
||
|
router = APIRouter()
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
def cleanup_task(output_dir: str):
|
||
|
logger.info(f"Cleaning up temporary directory: {output_dir}")
|
||
|
shutil.rmtree(output_dir)
|
||
|
logger.info("Cleanup complete")
|
||
|
|
||
|
@router.post("/")
|
||
|
async def generer(specs: DocumentSpecs, background_tasks: BackgroundTasks, current_user: Annotated[User, Depends(get_current_active_user)]):
|
||
|
logger.info(f"Starting document generation for user: {current_user.username}")
|
||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
base_name = f"{specs.style}-{specs.format}-{timestamp}"
|
||
|
output_dir = f"./out/{base_name}"
|
||
|
os.makedirs(output_dir, exist_ok=True)
|
||
|
logger.info(f"Created output directory: {output_dir}")
|
||
|
|
||
|
header_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/header.tex'
|
||
|
cover_file = f'{os.getcwd()}/styles/{specs.style}/{specs.format}/cover.tex'
|
||
|
logger.debug(f"Header file: {header_file}, Cover file: {cover_file}")
|
||
|
|
||
|
filters = ['latex-emoji.lua', 'centered.lua']
|
||
|
pdoc_args = [
|
||
|
f'--include-in-header={header_file}',
|
||
|
f'--include-after-body={cover_file}',
|
||
|
'--listings',
|
||
|
'--dpi=300',
|
||
|
f'--pdf-engine={specs.pdfengine}',
|
||
|
f'--resource-path={os.getcwd()}/resources/',
|
||
|
'-V', f'linkcolor={specs.linkcolor}',
|
||
|
'-V', f'fontsize={specs.fontsize}pt',
|
||
|
'-V', f'geometry:paperwidth={round(specs.paperwidth, -1) / 300}in',
|
||
|
'-V', f'geometry:paperheight={round(specs.paperheight, -1) / 300}in',
|
||
|
'-V', f'geometry:left={specs.margin / 300}in',
|
||
|
'-V', f'geometry:right={specs.margin / 300}in',
|
||
|
'-V', f'geometry:top={specs.vmargin / 300}in',
|
||
|
'-V', f'geometry:bottom={specs.vmargin / 300}in'
|
||
|
]
|
||
|
logger.debug(f"Pandoc arguments: {pdoc_args}")
|
||
|
|
||
|
pdf_file_path = f"{output_dir}/{base_name}.pdf"
|
||
|
markdown_file_path = f"{output_dir}/{base_name}.md"
|
||
|
latex_file_path = f"{output_dir}/{base_name}.tex"
|
||
|
images_path = f"{output_dir}/images"
|
||
|
video_file_path = f"{output_dir}/{base_name}.mp4"
|
||
|
|
||
|
try:
|
||
|
logger.info(f"Current working directory: {os.getcwd()}")
|
||
|
|
||
|
text_to_convert = replace_emojis(specs.content)
|
||
|
logger.debug("Emojis replaced in content")
|
||
|
|
||
|
# Save Markdown content
|
||
|
with open(markdown_file_path, 'w', encoding='utf-8') as md_file:
|
||
|
md_file.write(text_to_convert)
|
||
|
logger.info(f"Markdown file saved: {markdown_file_path}")
|
||
|
|
||
|
# Generate PDF and LaTeX
|
||
|
logger.info("Generating PDF...")
|
||
|
pypandoc.convert_text(source=text_to_convert,
|
||
|
to='pdf',
|
||
|
format='markdown+implicit_figures+smart+emoji',
|
||
|
encoding='utf-8',
|
||
|
extra_args=pdoc_args,
|
||
|
filters=filters,
|
||
|
cworkdir=os.getcwd(),
|
||
|
outputfile=pdf_file_path
|
||
|
)
|
||
|
logger.info(f"PDF generated: {pdf_file_path}")
|
||
|
|
||
|
logger.info("Generating LaTeX...")
|
||
|
pypandoc.convert_text(source=text_to_convert,
|
||
|
to='latex',
|
||
|
format='markdown+implicit_figures+smart+emoji',
|
||
|
encoding='utf-8',
|
||
|
extra_args=pdoc_args,
|
||
|
filters=filters,
|
||
|
cworkdir=os.getcwd(),
|
||
|
outputfile=latex_file_path
|
||
|
)
|
||
|
logger.info(f"LaTeX file generated: {latex_file_path}")
|
||
|
|
||
|
# Generate JPG images
|
||
|
os.makedirs(images_path, exist_ok=True)
|
||
|
logger.info(f"Converting PDF to JPG images in {images_path}")
|
||
|
convert_pdf(pdf_file_path, "jpg", images_path, resolution=300)
|
||
|
logger.info("JPG images generated")
|
||
|
|
||
|
# Generate MP4 video
|
||
|
logger.info("Generating MP4 video...")
|
||
|
try:
|
||
|
success = convert_video(
|
||
|
images_path=images_path,
|
||
|
output_path=video_file_path,
|
||
|
width=specs.paperwidth,
|
||
|
height=specs.paperheight,
|
||
|
fps=specs.fps,
|
||
|
stilltime=specs.stilltime
|
||
|
)
|
||
|
if success:
|
||
|
logger.info(f"MP4 video generated: {video_file_path}")
|
||
|
else:
|
||
|
logger.error(f"Failed to generate MP4 video: {video_file_path}")
|
||
|
raise Exception("Video generation failed")
|
||
|
except Exception as e:
|
||
|
logger.exception(f"Error during video generation: {str(e)}")
|
||
|
raise HTTPException(status_code=500, detail=f"Video generation failed: {str(e)}")
|
||
|
|
||
|
# Create ZIP file
|
||
|
zip_file_path = f"{output_dir}/{base_name}.zip"
|
||
|
logger.info(f"Creating ZIP file: {zip_file_path}")
|
||
|
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
|
||
|
zipf.write(pdf_file_path, os.path.basename(pdf_file_path))
|
||
|
zipf.write(markdown_file_path, os.path.basename(markdown_file_path))
|
||
|
zipf.write(latex_file_path, os.path.basename(latex_file_path))
|
||
|
zipf.write(video_file_path, os.path.basename(video_file_path))
|
||
|
for root, _, files in os.walk(images_path):
|
||
|
for file in files:
|
||
|
zipf.write(os.path.join(root, file),
|
||
|
os.path.join(f"{base_name}_images", file))
|
||
|
logger.info("ZIP file created successfully")
|
||
|
|
||
|
logger.info(f"Returning FileResponse for {zip_file_path}")
|
||
|
return FileResponse(zip_file_path, filename=f"{base_name}.zip")
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.exception(f"Error during document generation: {str(e)}")
|
||
|
raise HTTPException(status_code=500, detail=str(e))
|
||
|
|
||
|
finally:
|
||
|
# Schedule the cleanup task to run in the background after the response is sent
|
||
|
background_tasks.add_task(cleanup_task, output_dir)
|
||
|
|
||
|
@router.post("/cleanup/{base_name}")
|
||
|
async def cleanup(base_name: str, current_user: Annotated[User, Depends(get_current_active_user)]):
|
||
|
output_dir = f"./out/{base_name}"
|
||
|
if os.path.exists(output_dir):
|
||
|
cleanup_task(output_dir)
|
||
|
return {"message": f"Cleanup for {base_name} completed successfully"}
|
||
|
else:
|
||
|
raise HTTPException(status_code=404, detail=f"Directory for {base_name} not found")
|