"""MinIO storage adapter implementation""" import uuid from io import BytesIO from typing import Optional from flask import current_app from minio import Minio from minio.error import S3Error from app.services.storage.base import StorageAdapter class MinIOStorageAdapter(StorageAdapter): """MinIO implementation of StorageAdapter""" def __init__(self, minio_client: Optional[Minio] = None): """ Initialize MinIO adapter with client dependency Args: minio_client: MinIO client instance (optional, defaults to extension's client) """ if minio_client is not None: self.client = minio_client else: # Get client from storage extension (dependency injection) from app.services.storage.storage_extension import storage self.client = storage.get_client() def upload_file( self, file_data: BytesIO, bucket_name: str, object_name: str, content_type: str, length: Optional[int] = None, ) -> dict: """ Upload a file to MinIO Args: file_data: File data as BytesIO bucket_name: Name of bucket object_name: Name of the object in MinIO content_type: MIME type of the file length: File size in bytes (optional, will be determined if not provided) Returns: dict: Upload result with metadata """ try: # Get file length if not provided if length is None: file_data.seek(0, 2) # Seek to end length = file_data.tell() file_data.seek(0) # Seek back to start # Ensure bucket exists self.ensure_bucket_exists(bucket_name) # Upload file result = self.client.put_object( bucket_name, object_name, file_data, length=length, content_type=content_type, ) return { "success": True, "bucket": bucket_name, "object_name": object_name, "size": length, "etag": result.etag, "version_id": result.version_id if hasattr(result, "version_id") else None, } except S3Error as e: current_app.logger.error(f"MinIO upload error: {e}") raise def download_file(self, bucket_name: str, object_name: str) -> BytesIO: """ Download a file from MinIO Args: bucket_name: Name of bucket object_name: Name of the object in MinIO Returns: BytesIO: File data """ try: response = self.client.get_object(bucket_name, object_name) return BytesIO(response.read()) except S3Error as e: current_app.logger.error(f"MinIO download error: {e}") raise def delete_file(self, bucket_name: str, object_name: str) -> bool: """ Delete a file from MinIO Args: bucket_name: Name of bucket object_name: Name of the object in MinIO Returns: bool: True if successful """ try: self.client.remove_object(bucket_name, object_name) return True except S3Error as e: current_app.logger.error(f"MinIO delete error: {e}") return False def generate_presigned_url( self, bucket_name: str, object_name: str, expires_in: int = 3600 ) -> str: """ Generate a presigned URL for file access Args: bucket_name: Name of bucket object_name: Name of the object in MinIO expires_in: URL expiration time in seconds Returns: str: Presigned URL """ try: url = self.client.presigned_get_object( bucket_name, object_name, expires=expires_in ) return url except S3Error as e: current_app.logger.error(f"MinIO presigned URL error: {e}") raise def file_exists(self, bucket_name: str, object_name: str) -> bool: """ Check if a file exists in MinIO Args: bucket_name: Name of bucket object_name: Name of the object in MinIO Returns: bool: True if file exists """ try: self.client.stat_object(bucket_name, object_name) return True except S3Error as e: if e.code == "NoSuchKey": return False current_app.logger.error(f"MinIO check exists error: {e}") return False def get_file_info(self, bucket_name: str, object_name: str) -> dict: """ Get file metadata from MinIO Args: bucket_name: Name of bucket object_name: Name of the object in MinIO Returns: dict: File metadata """ try: stat = self.client.stat_object(bucket_name, object_name) return { "size": stat.size, "last_modified": stat.last_modified, "etag": stat.etag, "content_type": stat.content_type, } except S3Error as e: current_app.logger.error(f"MinIO get file info error: {e}") raise def ensure_bucket_exists(self, bucket_name: str) -> None: """ Ensure a bucket exists, create if it doesn't Args: bucket_name: Name of the bucket """ try: if not self.client.bucket_exists(bucket_name): self.client.make_bucket(bucket_name) current_app.logger.info(f"Created bucket: {bucket_name}") except S3Error as e: current_app.logger.error(f"MinIO ensure bucket error: {e}") raise def generate_unique_object_name(self, original_filename: str) -> str: """ Generate a unique object name for a file Args: original_filename: Original filename Returns: str: Unique object name """ # Extract file extension ext = ( original_filename.rsplit(".", 1)[1].lower() if "." in original_filename else "" ) # Generate unique filename with UUID unique_name = f"{uuid.uuid4().hex}" return f"{unique_name}.{ext}" if ext else unique_name