""" Example Celery tasks for the Crafting Shop application. These tasks demonstrate various Celery features and best practices. """ import logging import time from datetime import datetime from celery import shared_task # Get logger for this module logger = logging.getLogger(__name__) @shared_task(bind=True, name="tasks.print_hello") def print_hello(self, name: str = "World") -> str: """ Simple task that prints a greeting. Args: name: Name to greet (default: "World") Returns: Greeting message """ message = f"Hello {name} from Celery! Task ID: {self.request.id}" print(message) logger.info(message) return message @shared_task( bind=True, name="tasks.divide_numbers", autoretry_for=(ZeroDivisionError,), retry_backoff=True, retry_backoff_max=60, retry_jitter=True, max_retries=3, ) def divide_numbers(self, x: float, y: float) -> float: """ Task that demonstrates error handling and retry logic. Args: x: Numerator y: Denominator Returns: Result of division Raises: ZeroDivisionError: If y is zero (will trigger retry) """ logger.info(f"Dividing {x} by {y} (attempt {self.request.retries + 1})") if y == 0: logger.warning("Division by zero detected, retrying...") raise ZeroDivisionError("Cannot divide by zero") result = x / y logger.info(f"Division result: {result}") return result @shared_task(bind=True, name="tasks.send_daily_report", ignore_result=False) def send_daily_report(self) -> dict: """ Simulates sending a daily report. This task would typically send emails, generate reports, etc. Returns: Dictionary with report details """ logger.info("Starting daily report generation...") # Simulate some work time.sleep(2) report_data = { "date": datetime.now().isoformat(), "task_id": self.request.id, "report_type": "daily", "status": "generated", "metrics": { "total_products": 150, "total_orders": 42, "total_users": 89, "revenue": 12500.75, }, } logger.info(f"Daily report generated: {report_data}") print(f"📊 Daily Report Generated at {report_data['date']}") return report_data @shared_task( bind=True, name="tasks.update_product_statistics", queue="stats", priority=5 ) def update_product_statistics(self, product_id: int = None) -> dict: """ Simulates updating product statistics. Demonstrates task routing to a specific queue. Args: product_id: Optional specific product ID to update. If None, updates all products. Returns: Dictionary with update results """ logger.info(f"Updating product statistics for product_id={product_id}") # Simulate database work time.sleep(1) if product_id is None: # Update all products result = { "task": "update_all_product_stats", "status": "completed", "products_updated": 150, "timestamp": datetime.now().isoformat(), } else: # Update specific product result = { "task": "update_single_product_stats", "product_id": product_id, "status": "completed", "timestamp": datetime.now().isoformat(), "new_stats": {"views": 125, "purchases": 15, "rating": 4.5}, } logger.info(f"Product statistics updated: {result}") return result @shared_task( bind=True, name="tasks.long_running_task", time_limit=300, # 5 minutes soft_time_limit=240, # 4 minutes ) def long_running_task(self, iterations: int = 10) -> dict: """ Simulates a long-running task with progress tracking. Args: iterations: Number of iterations to simulate Returns: Dictionary with results """ logger.info(f"Starting long-running task with {iterations} iterations") results = [] for i in range(iterations): # Check if task has been revoked if self.is_aborted(): logger.warning("Task was aborted") return {"status": "aborted", "completed_iterations": i} # Simulate work time.sleep(1) # Update progress progress = (i + 1) / iterations * 100 self.update_state( state="PROGRESS", meta={"current": i + 1, "total": iterations, "progress": progress}, ) results.append(f"iteration_{i + 1}") logger.info(f"Completed iteration {i + 1}/{iterations}") final_result = { "status": "completed", "iterations": iterations, "results": results, "completed_at": datetime.now().isoformat(), } logger.info(f"Long-running task completed: {final_result}") return final_result