kanban-app/backend/app/celery/tasks/example_tasks.py

198 lines
5 KiB
Python
Raw Normal View History

2026-02-21 18:38:19 +00:00
"""
Example Celery tasks for the Crafting Shop application.
These tasks demonstrate various Celery features and best practices.
"""
import time
import logging
from datetime import datetime
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
# Get logger for this module
logger = logging.getLogger(__name__)
@shared_task(bind=True, name="tasks.print_hello")
def print_hello(self, name: str = "World") -> str:
"""
Simple task that prints a greeting.
Args:
name: Name to greet (default: "World")
Returns:
Greeting message
"""
message = f"Hello {name} from Celery! Task ID: {self.request.id}"
print(message)
logger.info(message)
return message
@shared_task(
bind=True,
name="tasks.divide_numbers",
autoretry_for=(ZeroDivisionError,),
retry_backoff=True,
retry_backoff_max=60,
retry_jitter=True,
max_retries=3
)
def divide_numbers(self, x: float, y: float) -> float:
"""
Task that demonstrates error handling and retry logic.
Args:
x: Numerator
y: Denominator
Returns:
Result of division
Raises:
ZeroDivisionError: If y is zero (will trigger retry)
"""
logger.info(f"Dividing {x} by {y} (attempt {self.request.retries + 1})")
if y == 0:
logger.warning(f"Division by zero detected, retrying...")
raise ZeroDivisionError("Cannot divide by zero")
result = x / y
logger.info(f"Division result: {result}")
return result
@shared_task(
bind=True,
name="tasks.send_daily_report",
ignore_result=False
)
def send_daily_report(self) -> dict:
"""
Simulates sending a daily report.
This task would typically send emails, generate reports, etc.
Returns:
Dictionary with report details
"""
logger.info("Starting daily report generation...")
# Simulate some work
time.sleep(2)
report_data = {
"date": datetime.now().isoformat(),
"task_id": self.request.id,
"report_type": "daily",
"status": "generated",
"metrics": {
"total_products": 150,
"total_orders": 42,
"total_users": 89,
"revenue": 12500.75
}
}
logger.info(f"Daily report generated: {report_data}")
print(f"📊 Daily Report Generated at {report_data['date']}")
return report_data
@shared_task(
bind=True,
name="tasks.update_product_statistics",
queue="stats",
priority=5
)
def update_product_statistics(self, product_id: int = None) -> dict:
"""
Simulates updating product statistics.
Demonstrates task routing to a specific queue.
Args:
product_id: Optional specific product ID to update.
If None, updates all products.
Returns:
Dictionary with update results
"""
logger.info(f"Updating product statistics for product_id={product_id}")
# Simulate database work
time.sleep(1)
if product_id is None:
# Update all products
result = {
"task": "update_all_product_stats",
"status": "completed",
"products_updated": 150,
"timestamp": datetime.now().isoformat()
}
else:
# Update specific product
result = {
"task": "update_single_product_stats",
"product_id": product_id,
"status": "completed",
"timestamp": datetime.now().isoformat(),
"new_stats": {
"views": 125,
"purchases": 15,
"rating": 4.5
}
}
logger.info(f"Product statistics updated: {result}")
return result
@shared_task(
bind=True,
name="tasks.long_running_task",
time_limit=300, # 5 minutes
soft_time_limit=240 # 4 minutes
)
def long_running_task(self, iterations: int = 10) -> dict:
"""
Simulates a long-running task with progress tracking.
Args:
iterations: Number of iterations to simulate
Returns:
Dictionary with results
"""
logger.info(f"Starting long-running task with {iterations} iterations")
results = []
for i in range(iterations):
# Check if task has been revoked
if self.is_aborted():
logger.warning("Task was aborted")
return {"status": "aborted", "completed_iterations": i}
# Simulate work
time.sleep(1)
# Update progress
progress = (i + 1) / iterations * 100
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": iterations, "progress": progress}
)
results.append(f"iteration_{i + 1}")
logger.info(f"Completed iteration {i + 1}/{iterations}")
final_result = {
"status": "completed",
"iterations": iterations,
"results": results,
"completed_at": datetime.now().isoformat()
}
logger.info(f"Long-running task completed: {final_result}")
return final_result