265 lines
7.8 KiB
Python
265 lines
7.8 KiB
Python
|
|
"""File processor classes for different file types"""
|
||
|
|
|
||
|
|
from abc import ABC, abstractmethod
|
||
|
|
from io import BytesIO
|
||
|
|
from typing import Optional, Tuple
|
||
|
|
|
||
|
|
from flask import current_app
|
||
|
|
from PIL import Image
|
||
|
|
|
||
|
|
|
||
|
|
class FileProcessor(ABC):
|
||
|
|
"""Abstract base class for file processors"""
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def validate(self, file_data: BytesIO, file_size: int, mime_type: str) -> bool:
|
||
|
|
"""
|
||
|
|
Validate file
|
||
|
|
|
||
|
|
Args:
|
||
|
|
file_data: File data as BytesIO
|
||
|
|
file_size: Size of the file in bytes
|
||
|
|
mime_type: MIME type of the file
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
bool: True if valid
|
||
|
|
"""
|
||
|
|
pass
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def process(self, file_data: BytesIO, mime_type: str) -> dict:
|
||
|
|
"""
|
||
|
|
Process file and return metadata
|
||
|
|
|
||
|
|
Args:
|
||
|
|
file_data: File data as BytesIO
|
||
|
|
mime_type: MIME type of the file
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
dict: Processing results and metadata
|
||
|
|
"""
|
||
|
|
pass
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def get_file_type(self) -> str:
|
||
|
|
"""Return the file type (image, pdf, document, etc.)"""
|
||
|
|
pass
|
||
|
|
|
||
|
|
|
||
|
|
class ImageProcessor(FileProcessor):
|
||
|
|
"""Processor for image files"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self.supported_formats = {
|
||
|
|
"image/jpeg": "jpeg",
|
||
|
|
"image/png": "png",
|
||
|
|
"image/gif": "gif",
|
||
|
|
"image/webp": "webp",
|
||
|
|
"image/svg+xml": "svg+xml",
|
||
|
|
}
|
||
|
|
|
||
|
|
def validate(self, file_data: BytesIO, file_size: int, mime_type: str) -> bool:
|
||
|
|
"""Validate image file"""
|
||
|
|
# Check file size
|
||
|
|
if file_size > current_app.config["MAX_UPLOAD_SIZE_IMAGE"]:
|
||
|
|
current_app.logger.error(
|
||
|
|
f"Image size {file_size} exceeds "
|
||
|
|
+ "maximum {current_app.config['MAX_UPLOAD_SIZE_IMAGE']}"
|
||
|
|
)
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Check MIME type
|
||
|
|
if mime_type not in current_app.config["ALLOWED_IMAGE_TYPES"]:
|
||
|
|
current_app.logger.error(f"Unsupported image MIME type: {mime_type}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Verify it's actually an image
|
||
|
|
try:
|
||
|
|
file_data.seek(0)
|
||
|
|
img = Image.open(file_data)
|
||
|
|
img.verify()
|
||
|
|
file_data.seek(0)
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
current_app.logger.error(f"Image validation failed: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def process(self, file_data: BytesIO, mime_type: str) -> dict:
|
||
|
|
"""Process image and extract metadata"""
|
||
|
|
file_data.seek(0)
|
||
|
|
img = Image.open(file_data)
|
||
|
|
|
||
|
|
metadata = {
|
||
|
|
"width": img.width,
|
||
|
|
"height": img.height,
|
||
|
|
"format": img.format,
|
||
|
|
"mode": img.mode,
|
||
|
|
"file_type": "image",
|
||
|
|
}
|
||
|
|
|
||
|
|
# Generate thumbnail
|
||
|
|
thumbnail_data = self._generate_thumbnail(img)
|
||
|
|
|
||
|
|
file_data.seek(0)
|
||
|
|
return {
|
||
|
|
"metadata": metadata,
|
||
|
|
"thumbnail_data": thumbnail_data,
|
||
|
|
}
|
||
|
|
|
||
|
|
def _generate_thumbnail(
|
||
|
|
self, img: Image.Image, size: Tuple[int, int] = (200, 200)
|
||
|
|
) -> Optional[BytesIO]:
|
||
|
|
"""
|
||
|
|
Generate thumbnail for image
|
||
|
|
|
||
|
|
Args:
|
||
|
|
img: PIL Image object
|
||
|
|
size: Thumbnail size (width, height)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
BytesIO: Thumbnail data or None
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# Create thumbnail
|
||
|
|
img_copy = img.copy()
|
||
|
|
img_copy.thumbnail(size, Image.Resampling.LANCZOS)
|
||
|
|
|
||
|
|
# Convert RGBA to RGB if necessary (JPEG doesn't support transparency)
|
||
|
|
if img_copy.mode in ("RGBA", "LA", "P"):
|
||
|
|
# Create white background
|
||
|
|
background = Image.new("RGB", img_copy.size, (255, 255, 255))
|
||
|
|
if img_copy.mode == "P":
|
||
|
|
img_copy = img_copy.convert("RGBA")
|
||
|
|
if img_copy.mode in ("RGBA", "LA"):
|
||
|
|
background.paste(img_copy, mask=img_copy.split()[-1])
|
||
|
|
else:
|
||
|
|
background.paste(img_copy)
|
||
|
|
img_copy = background
|
||
|
|
elif img_copy.mode != "RGB":
|
||
|
|
img_copy = img_copy.convert("RGB")
|
||
|
|
|
||
|
|
# Save to BytesIO
|
||
|
|
thumb_data = BytesIO()
|
||
|
|
img_copy.save(thumb_data, format="JPEG", quality=85)
|
||
|
|
thumb_data.seek(0)
|
||
|
|
|
||
|
|
return thumb_data
|
||
|
|
except Exception as e:
|
||
|
|
current_app.logger.error(f"Thumbnail generation failed: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_file_type(self) -> str:
|
||
|
|
return "image"
|
||
|
|
|
||
|
|
|
||
|
|
class PDFProcessor(FileProcessor):
|
||
|
|
"""Processor for PDF files"""
|
||
|
|
|
||
|
|
def validate(self, file_data: BytesIO, file_size: int, mime_type: str) -> bool:
|
||
|
|
"""Validate PDF file"""
|
||
|
|
# Check file size
|
||
|
|
if file_size > current_app.config["MAX_UPLOAD_SIZE_DOCUMENT"]:
|
||
|
|
current_app.logger.error(
|
||
|
|
f"PDF size {file_size} exceeds "
|
||
|
|
f"maximum {current_app.config['MAX_UPLOAD_SIZE_DOCUMENT']}"
|
||
|
|
)
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Check MIME type
|
||
|
|
if mime_type not in current_app.config["ALLOWED_DOCUMENT_TYPES"]:
|
||
|
|
current_app.logger.error(f"Unsupported document MIME type: {mime_type}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Verify it's actually a PDF
|
||
|
|
try:
|
||
|
|
file_data.seek(0)
|
||
|
|
header = file_data.read(4)
|
||
|
|
file_data.seek(0)
|
||
|
|
if header != b"%PDF":
|
||
|
|
current_app.logger.error("Invalid PDF header")
|
||
|
|
return False
|
||
|
|
return True
|
||
|
|
except Exception as e:
|
||
|
|
current_app.logger.error(f"PDF validation failed: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def process(self, file_data: BytesIO, mime_type: str) -> dict:
|
||
|
|
"""Process PDF and extract metadata"""
|
||
|
|
file_data.seek(0)
|
||
|
|
|
||
|
|
# Basic PDF metadata extraction
|
||
|
|
# Note: For more detailed extraction, you might want to use PyPDF2 or similar
|
||
|
|
metadata = {
|
||
|
|
"file_type": "pdf",
|
||
|
|
}
|
||
|
|
|
||
|
|
return {
|
||
|
|
"metadata": metadata,
|
||
|
|
"thumbnail_data": None, # PDFs don't have thumbnails by default
|
||
|
|
}
|
||
|
|
|
||
|
|
def get_file_type(self) -> str:
|
||
|
|
return "pdf"
|
||
|
|
|
||
|
|
|
||
|
|
class DocumentProcessor(FileProcessor):
|
||
|
|
"""Processor for other document types (placeholder for future expansion)"""
|
||
|
|
|
||
|
|
def validate(self, file_data: BytesIO, file_size: int, mime_type: str) -> bool:
|
||
|
|
"""Validate document file"""
|
||
|
|
# Basic validation - can be extended for specific document types
|
||
|
|
if file_size > current_app.config["MAX_UPLOAD_SIZE_DOCUMENT"]:
|
||
|
|
current_app.logger.error("Document size exceeds maximum")
|
||
|
|
return False
|
||
|
|
return True
|
||
|
|
|
||
|
|
def process(self, file_data: BytesIO, mime_type: str) -> dict:
|
||
|
|
"""Process document"""
|
||
|
|
return {
|
||
|
|
"metadata": {"file_type": "document"},
|
||
|
|
"thumbnail_data": None,
|
||
|
|
}
|
||
|
|
|
||
|
|
def get_file_type(self) -> str:
|
||
|
|
return "document"
|
||
|
|
|
||
|
|
|
||
|
|
class ProcessorFactory:
|
||
|
|
"""Factory for creating appropriate file processors"""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def get_processor(mime_type: str) -> FileProcessor:
|
||
|
|
"""
|
||
|
|
Get appropriate processor for MIME type
|
||
|
|
|
||
|
|
Args:
|
||
|
|
mime_type: MIME type of the file
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
FileProcessor: Appropriate processor instance
|
||
|
|
"""
|
||
|
|
if mime_type.startswith("image/"):
|
||
|
|
return ImageProcessor()
|
||
|
|
elif mime_type == "application/pdf":
|
||
|
|
return PDFProcessor()
|
||
|
|
else:
|
||
|
|
return DocumentProcessor()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def get_file_type_from_mime(mime_type: str) -> str:
|
||
|
|
"""
|
||
|
|
Get file type category from MIME type
|
||
|
|
|
||
|
|
Args:
|
||
|
|
mime_type: MIME type of the file
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
str: File type category
|
||
|
|
"""
|
||
|
|
if mime_type.startswith("image/"):
|
||
|
|
return "image"
|
||
|
|
elif mime_type == "application/pdf":
|
||
|
|
return "pdf"
|
||
|
|
else:
|
||
|
|
return "document"
|