add celery, worker and scheduler
This commit is contained in:
parent
b883898ed8
commit
b4387dd808
16 changed files with 722 additions and 16 deletions
|
|
@ -13,9 +13,13 @@ POSTGRES_DB=crafting_shop
|
|||
GRAFANA_USER=admin
|
||||
GRAFANA_PASSWORD=change-this-password-in-production
|
||||
|
||||
# Celery Configuration
|
||||
CELERY_BROKER_URL=redis://redis:6379/0
|
||||
CELERY_RESULT_BACKEND=redis://redis:6379/0
|
||||
|
||||
# Optional: External Services
|
||||
# REDIS_URL=redis://localhost:6379/0
|
||||
# SMTP_HOST=smtp.gmail.com
|
||||
# SMTP_PORT=587
|
||||
# SMTP_USER=your-email@gmail.com
|
||||
# SMTP_PASSWORD=your-smtp-password
|
||||
# SMTP_PASSWORD=your-smtp-password
|
||||
|
|
|
|||
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -79,4 +79,6 @@ htmlcov/
|
|||
# Temporary files
|
||||
*.tmp
|
||||
*.temp
|
||||
.cache/
|
||||
.cache/
|
||||
|
||||
celerybeat-schedule
|
||||
32
Makefile
32
Makefile
|
|
@ -1,4 +1,4 @@
|
|||
.PHONY: help install dev-services dev-stop-services dev-backend dev-frontend dev build test lint clean up down restart logs
|
||||
.PHONY: help install dev-services dev-stop-services dev-backend dev-frontend dev build test lint clean up down restart logs celery-worker celery-beat celery-flower celery-shell
|
||||
|
||||
help: ## Show this help message
|
||||
@echo "Available commands:"
|
||||
|
|
@ -21,7 +21,7 @@ dev-stop-services: ## Stop development services
|
|||
|
||||
dev-backend: ## Start backend server locally
|
||||
@echo "Starting backend server..."
|
||||
cd backend && source venv/bin/activate && flask run
|
||||
cd backend && . venv/bin/activate && flask run
|
||||
|
||||
dev-frontend: ## Start frontend server locally
|
||||
@echo "Starting frontend server..."
|
||||
|
|
@ -106,4 +106,30 @@ backup: ## Backup database
|
|||
docker exec crafting-shop-postgres pg_dump -U crafting crafting_shop > backup.sql
|
||||
|
||||
restore: ## Restore database (usage: make restore FILE=backup.sql)
|
||||
docker exec -i crafting-shop-postgres psql -U crafting crafting_shop < $(FILE)
|
||||
docker exec -i crafting-shop-postgres psql -U crafting crafting_shop < $(FILE)
|
||||
|
||||
# Celery Commands
|
||||
celery-worker: ## Start Celery worker locally
|
||||
@echo "Starting Celery worker..."
|
||||
cd backend && . venv/bin/activate && celery -A celery_worker worker --loglevel=info --concurrency=4
|
||||
|
||||
celery-beat: ## Start Celery Beat scheduler locally
|
||||
@echo "Starting Celery Beat scheduler..."
|
||||
cd backend && . venv/bin/activate && celery -A celery_worker beat --loglevel=info
|
||||
|
||||
celery-flower: ## Start Flower monitoring locally
|
||||
@echo "Starting Flower monitoring..."
|
||||
cd backend && . venv/bin/activate && celery -A celery_worker flower --port=5555
|
||||
|
||||
celery-shell: ## Open Celery shell for task inspection
|
||||
@echo "Opening Celery shell..."
|
||||
cd backend && . venv/bin/activate && celery -A celery_worker shell
|
||||
|
||||
logs-celery: ## Show Celery worker logs
|
||||
docker compose logs -f celery_worker
|
||||
|
||||
logs-celery-beat: ## Show Celery Beat logs
|
||||
docker compose logs -f celery_beat
|
||||
|
||||
logs-flower: ## Show Flower logs
|
||||
docker compose logs -f flower
|
||||
|
|
|
|||
|
|
@ -5,3 +5,7 @@ CORS_ORIGINS=*
|
|||
DEV_DATABASE_URL=postgresql://crafting:devpassword@localhost:5432/crafting_shop
|
||||
DATABASE_URL=postgresql://user:password@localhost/proddb
|
||||
TEST_DATABASE_URL=sqlite:///test.db
|
||||
|
||||
# Celery Configuration
|
||||
CELERY_BROKER_URL=redis://localhost:6379/0
|
||||
CELERY_RESULT_BACKEND=redis://localhost:6379/0
|
||||
|
|
|
|||
|
|
@ -1,16 +1,17 @@
|
|||
import json
|
||||
from flask import Flask, jsonify
|
||||
from flask_cors import CORS
|
||||
from flask_jwt_extended import JWTManager
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_migrate import Migrate
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
# Create extensions but don't initialize them yet
|
||||
db = SQLAlchemy()
|
||||
migrate = Migrate()
|
||||
jwt = JWTManager()
|
||||
cors = CORS()
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
def create_app(config_name=None):
|
||||
"""Application factory pattern"""
|
||||
|
|
@ -18,32 +19,41 @@ def create_app(config_name=None):
|
|||
|
||||
# Load configuration
|
||||
if config_name is None:
|
||||
config_name = os.environ.get('FLASK_ENV', 'development')
|
||||
config_name = os.environ.get("FLASK_ENV", "development")
|
||||
|
||||
from app.config import config_by_name
|
||||
app.config.from_object(config_by_name[config_name])
|
||||
|
||||
print('----------------------------------------------------------')
|
||||
print(F'------------------ENVIRONMENT: {config_name}-------------------------------------')
|
||||
# print(F'------------------CONFIG: {app.config}-------------------------------------')
|
||||
# print(json.dumps(dict(app.config), indent=2, default=str))
|
||||
print('----------------------------------------------------------')
|
||||
# Initialize extensions with app
|
||||
db.init_app(app)
|
||||
migrate.init_app(app, db)
|
||||
jwt.init_app(app)
|
||||
cors.init_app(app, resources={r"/api/*": {"origins": app.config.get('CORS_ORIGINS', '*')}})
|
||||
cors.init_app(app, resources={r"/api/*": {"origins": app.config.get("CORS_ORIGINS", "*")}})
|
||||
|
||||
# Initialize Celery
|
||||
from app.celery import init_celery
|
||||
init_celery(app)
|
||||
|
||||
# Import models (required for migrations)
|
||||
from app.models import user, product, order
|
||||
|
||||
# Register blueprints
|
||||
from app.routes import api_bp, health_bp
|
||||
app.register_blueprint(api_bp, url_prefix='/api')
|
||||
app.register_blueprint(api_bp, url_prefix="/api")
|
||||
app.register_blueprint(health_bp)
|
||||
|
||||
# Global error handlers
|
||||
@app.errorhandler(404)
|
||||
def not_found(error):
|
||||
return jsonify({'error': 'Not found'}), 404
|
||||
return jsonify({"error": "Not found"}), 404
|
||||
|
||||
@app.errorhandler(500)
|
||||
def internal_error(error):
|
||||
return jsonify({'error': 'Internal server error'}), 500
|
||||
return jsonify({"error": "Internal server error"}), 500
|
||||
|
||||
return app
|
||||
return app
|
||||
|
|
|
|||
73
backend/app/celery/__init__.py
Normal file
73
backend/app/celery/__init__.py
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
"""
|
||||
Celery application factory for the Crafting Shop application.
|
||||
Follows the same application factory pattern as Flask.
|
||||
"""
|
||||
from celery import Celery
|
||||
from flask import Flask
|
||||
|
||||
|
||||
def make_celery(app: Flask) -> Celery:
|
||||
"""
|
||||
Create and configure a Celery application with Flask context.
|
||||
|
||||
Args:
|
||||
app: Flask application instance
|
||||
|
||||
Returns:
|
||||
Configured Celery application instance.
|
||||
"""
|
||||
# Create Celery app with Flask app name
|
||||
celery_app = Celery(
|
||||
app.import_name,
|
||||
broker=app.config["CELERY"]["broker_url"],
|
||||
backend=app.config["CELERY"]["result_backend"]
|
||||
)
|
||||
|
||||
# Update configuration from Flask config
|
||||
celery_app.conf.update(app.config["CELERY"])
|
||||
|
||||
# Set up Flask application context for tasks
|
||||
# This ensures tasks have access to Flask extensions (db, etc.)
|
||||
class ContextTask(celery_app.Task):
|
||||
"""Celery task that runs within Flask application context."""
|
||||
def __call__(self, *args, **kwargs):
|
||||
with app.app_context():
|
||||
return self.run(*args, **kwargs)
|
||||
|
||||
celery_app.Task = ContextTask
|
||||
|
||||
# Auto-discover tasks in the tasks module
|
||||
celery_app.autodiscover_tasks(['app.celery.tasks'])
|
||||
|
||||
# Configure Beat schedule
|
||||
from .beat_schedule import configure_beat_schedule
|
||||
configure_beat_schedule(celery_app)
|
||||
|
||||
# Import tasks to ensure they're registered
|
||||
from .tasks import example_tasks
|
||||
|
||||
print(f"✅ Celery configured with broker: {celery_app.conf.broker_url}")
|
||||
print(f"✅ Celery configured with backend: {celery_app.conf.result_backend}")
|
||||
print(f"✅ Beat schedule configured with {len(celery_app.conf.beat_schedule)} tasks")
|
||||
|
||||
return celery_app
|
||||
|
||||
|
||||
# Global Celery instance
|
||||
celery = None
|
||||
|
||||
|
||||
def init_celery(app: Flask) -> Celery:
|
||||
"""
|
||||
Initialize the global celery instance with Flask app.
|
||||
This should be called in create_app() after Flask app is created.
|
||||
|
||||
Args:
|
||||
app: Flask application instance
|
||||
|
||||
Returns:
|
||||
Configured Celery application instance
|
||||
"""
|
||||
global celery
|
||||
celery = make_celery(app)
|
||||
return celery
|
||||
99
backend/app/celery/beat_schedule.py
Normal file
99
backend/app/celery/beat_schedule.py
Normal file
|
|
@ -0,0 +1,99 @@
|
|||
"""
|
||||
Celery Beat schedule configuration for periodic tasks.
|
||||
This defines when scheduled tasks should run.
|
||||
"""
|
||||
from celery.schedules import crontab
|
||||
|
||||
|
||||
# Celery Beat schedule configuration
|
||||
beat_schedule = {
|
||||
# Run every minute (for testing/demo)
|
||||
"print-hello-every-minute": {
|
||||
"task": "tasks.print_hello",
|
||||
"schedule": crontab(minute="*"), # Every minute
|
||||
"args": ("Celery Beat",),
|
||||
"options": {"queue": "default"},
|
||||
},
|
||||
|
||||
# Run daily at 9:00 AM
|
||||
"send-daily-report": {
|
||||
"task": "tasks.send_daily_report",
|
||||
"schedule": crontab(hour=9, minute=0), # 9:00 AM daily
|
||||
"options": {"queue": "reports"},
|
||||
},
|
||||
|
||||
# Run every hour at minute 0
|
||||
"update-product-stats-hourly": {
|
||||
"task": "tasks.update_product_statistics",
|
||||
"schedule": crontab(minute=0), # Every hour at minute 0
|
||||
"args": (None,), # Update all products
|
||||
"options": {"queue": "stats"},
|
||||
},
|
||||
|
||||
# Run every Monday at 8:00 AM
|
||||
"weekly-maintenance": {
|
||||
"task": "tasks.long_running_task",
|
||||
"schedule": crontab(hour=8, minute=0, day_of_week=1), # Monday 8:00 AM
|
||||
"args": (5,), # 5 iterations
|
||||
"options": {"queue": "maintenance"},
|
||||
},
|
||||
|
||||
# Run every 5 minutes (for monitoring/heartbeat)
|
||||
"heartbeat-check": {
|
||||
"task": "tasks.print_hello",
|
||||
"schedule": 300.0, # Every 300 seconds (5 minutes)
|
||||
"args": ("Heartbeat",),
|
||||
"options": {"queue": "monitoring"},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def configure_beat_schedule(celery_app):
|
||||
"""
|
||||
Configure Celery Beat schedule on the Celery app.
|
||||
|
||||
Args:
|
||||
celery_app: Celery application instance
|
||||
"""
|
||||
celery_app.conf.beat_schedule = beat_schedule
|
||||
|
||||
# Configure timezone
|
||||
celery_app.conf.timezone = "UTC"
|
||||
celery_app.conf.enable_utc = True
|
||||
|
||||
# Configure task routes for scheduled tasks
|
||||
celery_app.conf.task_routes = {
|
||||
"tasks.print_hello": {"queue": "default"},
|
||||
"tasks.send_daily_report": {"queue": "reports"},
|
||||
"tasks.update_product_statistics": {"queue": "stats"},
|
||||
"tasks.long_running_task": {"queue": "maintenance"},
|
||||
}
|
||||
|
||||
# Configure queues
|
||||
celery_app.conf.task_queues = {
|
||||
"default": {
|
||||
"exchange": "default",
|
||||
"exchange_type": "direct",
|
||||
"routing_key": "default",
|
||||
},
|
||||
"reports": {
|
||||
"exchange": "reports",
|
||||
"exchange_type": "direct",
|
||||
"routing_key": "reports",
|
||||
},
|
||||
"stats": {
|
||||
"exchange": "stats",
|
||||
"exchange_type": "direct",
|
||||
"routing_key": "stats",
|
||||
},
|
||||
"maintenance": {
|
||||
"exchange": "maintenance",
|
||||
"exchange_type": "direct",
|
||||
"routing_key": "maintenance",
|
||||
},
|
||||
"monitoring": {
|
||||
"exchange": "monitoring",
|
||||
"exchange_type": "direct",
|
||||
"routing_key": "monitoring",
|
||||
},
|
||||
}
|
||||
15
backend/app/celery/tasks/__init__.py
Normal file
15
backend/app/celery/tasks/__init__.py
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
"""
|
||||
Celery tasks for the Crafting Shop application.
|
||||
Tasks are organized by domain/functionality.
|
||||
"""
|
||||
|
||||
# Import all task modules here to ensure they're registered with Celery
|
||||
from . import example_tasks
|
||||
|
||||
# Re-export tasks for easier imports
|
||||
from .example_tasks import (
|
||||
print_hello,
|
||||
divide_numbers,
|
||||
send_daily_report,
|
||||
update_product_statistics,
|
||||
)
|
||||
198
backend/app/celery/tasks/example_tasks.py
Normal file
198
backend/app/celery/tasks/example_tasks.py
Normal file
|
|
@ -0,0 +1,198 @@
|
|||
"""
|
||||
Example Celery tasks for the Crafting Shop application.
|
||||
These tasks demonstrate various Celery features and best practices.
|
||||
"""
|
||||
import time
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from celery import shared_task
|
||||
from celery.exceptions import MaxRetriesExceededError
|
||||
|
||||
# Get logger for this module
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(bind=True, name="tasks.print_hello")
|
||||
def print_hello(self, name: str = "World") -> str:
|
||||
"""
|
||||
Simple task that prints a greeting.
|
||||
|
||||
Args:
|
||||
name: Name to greet (default: "World")
|
||||
|
||||
Returns:
|
||||
Greeting message
|
||||
"""
|
||||
message = f"Hello {name} from Celery! Task ID: {self.request.id}"
|
||||
print(message)
|
||||
logger.info(message)
|
||||
return message
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
name="tasks.divide_numbers",
|
||||
autoretry_for=(ZeroDivisionError,),
|
||||
retry_backoff=True,
|
||||
retry_backoff_max=60,
|
||||
retry_jitter=True,
|
||||
max_retries=3
|
||||
)
|
||||
def divide_numbers(self, x: float, y: float) -> float:
|
||||
"""
|
||||
Task that demonstrates error handling and retry logic.
|
||||
|
||||
Args:
|
||||
x: Numerator
|
||||
y: Denominator
|
||||
|
||||
Returns:
|
||||
Result of division
|
||||
|
||||
Raises:
|
||||
ZeroDivisionError: If y is zero (will trigger retry)
|
||||
"""
|
||||
logger.info(f"Dividing {x} by {y} (attempt {self.request.retries + 1})")
|
||||
|
||||
if y == 0:
|
||||
logger.warning(f"Division by zero detected, retrying...")
|
||||
raise ZeroDivisionError("Cannot divide by zero")
|
||||
|
||||
result = x / y
|
||||
logger.info(f"Division result: {result}")
|
||||
return result
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
name="tasks.send_daily_report",
|
||||
ignore_result=False
|
||||
)
|
||||
def send_daily_report(self) -> dict:
|
||||
"""
|
||||
Simulates sending a daily report.
|
||||
This task would typically send emails, generate reports, etc.
|
||||
|
||||
Returns:
|
||||
Dictionary with report details
|
||||
"""
|
||||
logger.info("Starting daily report generation...")
|
||||
|
||||
# Simulate some work
|
||||
time.sleep(2)
|
||||
|
||||
report_data = {
|
||||
"date": datetime.now().isoformat(),
|
||||
"task_id": self.request.id,
|
||||
"report_type": "daily",
|
||||
"status": "generated",
|
||||
"metrics": {
|
||||
"total_products": 150,
|
||||
"total_orders": 42,
|
||||
"total_users": 89,
|
||||
"revenue": 12500.75
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(f"Daily report generated: {report_data}")
|
||||
print(f"📊 Daily Report Generated at {report_data['date']}")
|
||||
|
||||
return report_data
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
name="tasks.update_product_statistics",
|
||||
queue="stats",
|
||||
priority=5
|
||||
)
|
||||
def update_product_statistics(self, product_id: int = None) -> dict:
|
||||
"""
|
||||
Simulates updating product statistics.
|
||||
Demonstrates task routing to a specific queue.
|
||||
|
||||
Args:
|
||||
product_id: Optional specific product ID to update.
|
||||
If None, updates all products.
|
||||
|
||||
Returns:
|
||||
Dictionary with update results
|
||||
"""
|
||||
logger.info(f"Updating product statistics for product_id={product_id}")
|
||||
|
||||
# Simulate database work
|
||||
time.sleep(1)
|
||||
|
||||
if product_id is None:
|
||||
# Update all products
|
||||
result = {
|
||||
"task": "update_all_product_stats",
|
||||
"status": "completed",
|
||||
"products_updated": 150,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
else:
|
||||
# Update specific product
|
||||
result = {
|
||||
"task": "update_single_product_stats",
|
||||
"product_id": product_id,
|
||||
"status": "completed",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"new_stats": {
|
||||
"views": 125,
|
||||
"purchases": 15,
|
||||
"rating": 4.5
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(f"Product statistics updated: {result}")
|
||||
return result
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
name="tasks.long_running_task",
|
||||
time_limit=300, # 5 minutes
|
||||
soft_time_limit=240 # 4 minutes
|
||||
)
|
||||
def long_running_task(self, iterations: int = 10) -> dict:
|
||||
"""
|
||||
Simulates a long-running task with progress tracking.
|
||||
|
||||
Args:
|
||||
iterations: Number of iterations to simulate
|
||||
|
||||
Returns:
|
||||
Dictionary with results
|
||||
"""
|
||||
logger.info(f"Starting long-running task with {iterations} iterations")
|
||||
|
||||
results = []
|
||||
for i in range(iterations):
|
||||
# Check if task has been revoked
|
||||
if self.is_aborted():
|
||||
logger.warning("Task was aborted")
|
||||
return {"status": "aborted", "completed_iterations": i}
|
||||
|
||||
# Simulate work
|
||||
time.sleep(1)
|
||||
|
||||
# Update progress
|
||||
progress = (i + 1) / iterations * 100
|
||||
self.update_state(
|
||||
state="PROGRESS",
|
||||
meta={"current": i + 1, "total": iterations, "progress": progress}
|
||||
)
|
||||
|
||||
results.append(f"iteration_{i + 1}")
|
||||
logger.info(f"Completed iteration {i + 1}/{iterations}")
|
||||
|
||||
final_result = {
|
||||
"status": "completed",
|
||||
"iterations": iterations,
|
||||
"results": results,
|
||||
"completed_at": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
logger.info(f"Long-running task completed: {final_result}")
|
||||
return final_result
|
||||
|
|
@ -10,6 +10,23 @@ class Config:
|
|||
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
|
||||
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
|
||||
CORS_ORIGINS = os.environ.get("CORS_ORIGINS", "*")
|
||||
|
||||
# Celery Configuration
|
||||
CELERY = {
|
||||
"broker_url": os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"),
|
||||
"result_backend": os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0"),
|
||||
"task_serializer": "json",
|
||||
"result_serializer": "json",
|
||||
"accept_content": ["json"],
|
||||
"timezone": "UTC",
|
||||
"enable_utc": True,
|
||||
"task_track_started": True,
|
||||
"task_time_limit": 30 * 60, # 30 minutes
|
||||
"task_soft_time_limit": 25 * 60, # 25 minutes
|
||||
"worker_prefetch_multiplier": 1,
|
||||
"worker_max_tasks_per_child": 100,
|
||||
"broker_connection_retry_on_startup": True,
|
||||
}
|
||||
|
||||
|
||||
class DevelopmentConfig(Config):
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ from flask import Blueprint, request, jsonify
|
|||
from flask_jwt_extended import jwt_required, get_jwt_identity, create_access_token, create_refresh_token
|
||||
from app import db
|
||||
from app.models import User, Product, OrderItem, Order
|
||||
from app.celery import celery
|
||||
|
||||
api_bp = Blueprint("api", __name__)
|
||||
|
||||
|
|
@ -231,4 +232,137 @@ def get_order(order_id):
|
|||
if not user or not user.is_admin:
|
||||
return jsonify({"error": "Access denied"}), 403
|
||||
|
||||
return jsonify(order.to_dict()), 200
|
||||
return jsonify(order.to_dict()), 200
|
||||
|
||||
|
||||
# Celery Task Routes
|
||||
@api_bp.route("/tasks/hello", methods=["POST"])
|
||||
@jwt_required()
|
||||
def trigger_hello_task():
|
||||
"""Trigger the hello task"""
|
||||
data = request.get_json() or {}
|
||||
name = data.get("name", "World")
|
||||
|
||||
task = celery.send_task("tasks.print_hello", args=[name])
|
||||
|
||||
return jsonify({
|
||||
"message": "Hello task triggered",
|
||||
"task_id": task.id,
|
||||
"status": "pending"
|
||||
}), 202
|
||||
|
||||
|
||||
@api_bp.route("/tasks/divide", methods=["POST"])
|
||||
@jwt_required()
|
||||
def trigger_divide_task():
|
||||
"""Trigger the divide numbers task"""
|
||||
data = request.get_json() or {}
|
||||
x = data.get("x", 10)
|
||||
y = data.get("y", 2)
|
||||
|
||||
task = celery.send_task("tasks.divide_numbers", args=[x, y])
|
||||
|
||||
return jsonify({
|
||||
"message": "Divide task triggered",
|
||||
"task_id": task.id,
|
||||
"operation": f"{x} / {y}",
|
||||
"status": "pending"
|
||||
}), 202
|
||||
|
||||
|
||||
@api_bp.route("/tasks/report", methods=["POST"])
|
||||
@jwt_required()
|
||||
def trigger_report_task():
|
||||
"""Trigger the daily report task"""
|
||||
task = celery.send_task("tasks.send_daily_report")
|
||||
|
||||
return jsonify({
|
||||
"message": "Daily report task triggered",
|
||||
"task_id": task.id,
|
||||
"status": "pending"
|
||||
}), 202
|
||||
|
||||
|
||||
@api_bp.route("/tasks/stats", methods=["POST"])
|
||||
@jwt_required()
|
||||
def trigger_stats_task():
|
||||
"""Trigger product statistics update task"""
|
||||
data = request.get_json() or {}
|
||||
product_id = data.get("product_id")
|
||||
|
||||
if product_id:
|
||||
task = celery.send_task("tasks.update_product_statistics", args=[product_id])
|
||||
message = f"Product statistics update triggered for product {product_id}"
|
||||
else:
|
||||
task = celery.send_task("tasks.update_product_statistics", args=[None])
|
||||
message = "Product statistics update triggered for all products"
|
||||
|
||||
return jsonify({
|
||||
"message": message,
|
||||
"task_id": task.id,
|
||||
"status": "pending"
|
||||
}), 202
|
||||
|
||||
|
||||
@api_bp.route("/tasks/long-running", methods=["POST"])
|
||||
@jwt_required()
|
||||
def trigger_long_running_task():
|
||||
"""Trigger a long-running task"""
|
||||
data = request.get_json() or {}
|
||||
iterations = data.get("iterations", 10)
|
||||
|
||||
task = celery.send_task("tasks.long_running_task", args=[iterations])
|
||||
|
||||
return jsonify({
|
||||
"message": f"Long-running task triggered with {iterations} iterations",
|
||||
"task_id": task.id,
|
||||
"status": "pending"
|
||||
}), 202
|
||||
|
||||
|
||||
@api_bp.route("/tasks/<task_id>", methods=["GET"])
|
||||
@jwt_required()
|
||||
def get_task_status(task_id):
|
||||
"""Get the status of a Celery task"""
|
||||
task_result = celery.AsyncResult(task_id)
|
||||
|
||||
response = {
|
||||
"task_id": task_id,
|
||||
"status": task_result.status,
|
||||
"ready": task_result.ready()
|
||||
}
|
||||
|
||||
if task_result.ready():
|
||||
if task_result.successful():
|
||||
response["result"] = task_result.result
|
||||
else:
|
||||
response["error"] = str(task_result.result)
|
||||
response["traceback"] = task_result.traceback
|
||||
|
||||
return jsonify(response), 200
|
||||
|
||||
|
||||
@api_bp.route("/tasks/health", methods=["GET"])
|
||||
def celery_health():
|
||||
"""Check Celery health"""
|
||||
try:
|
||||
# Try to ping the worker
|
||||
inspector = celery.control.inspect()
|
||||
stats = inspector.stats()
|
||||
|
||||
if stats:
|
||||
return jsonify({
|
||||
"status": "healthy",
|
||||
"workers": len(stats),
|
||||
"workers_info": stats
|
||||
}), 200
|
||||
else:
|
||||
return jsonify({
|
||||
"status": "unhealthy",
|
||||
"message": "No workers available"
|
||||
}), 503
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
}), 500
|
||||
|
|
|
|||
23
backend/celery_worker.py
Normal file
23
backend/celery_worker.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
"""
|
||||
Celery worker entry point.
|
||||
This script is used to start Celery workers and provides the proper Flask context.
|
||||
"""
|
||||
import os
|
||||
|
||||
# Create Flask app (don't export it at module level to avoid conflicts with Celery)
|
||||
from app import create_app
|
||||
|
||||
_flask_app = create_app(os.getenv("FLASK_ENV", "development"))
|
||||
|
||||
# Import and initialize Celery with Flask app context
|
||||
from app.celery import celery
|
||||
|
||||
# Celery is now configured and ready with Flask app context
|
||||
# Workers will use this instance and have access to Flask extensions (db, etc.)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# This allows running the worker directly if needed
|
||||
print("Celery worker entry point initialized")
|
||||
print(f"Flask app created: {_flask_app.name}")
|
||||
print(f"Celery broker: {celery.conf.broker_url}")
|
||||
print(f"Celery backend: {celery.conf.result_backend}")
|
||||
|
|
@ -9,6 +9,30 @@ class Config:
|
|||
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key-change-in-production'
|
||||
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
|
||||
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
|
||||
|
||||
# Celery Configuration
|
||||
CELERY = {
|
||||
"broker_url": os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
||||
"result_backend": os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0"),
|
||||
"task_serializer": "json",
|
||||
"result_serializer": "json",
|
||||
"accept_content": ["json"],
|
||||
"timezone": "UTC",
|
||||
"enable_utc": True,
|
||||
"task_track_started": True,
|
||||
"task_time_limit": 30 * 60, # 30 minutes
|
||||
"task_soft_time_limit": 25 * 60, # 25 minutes
|
||||
"task_acks_late": True, # Acknowledge after task completion
|
||||
"task_reject_on_worker_lost": True, # Re-queue if worker dies
|
||||
"worker_prefetch_multiplier": 1, # Process one task at a time
|
||||
"worker_max_tasks_per_child": 100, # Restart worker after 100 tasks
|
||||
"broker_connection_retry_on_startup": True,
|
||||
"broker_connection_max_retries": 5,
|
||||
"result_expires": 3600, # Results expire in 1 hour
|
||||
"task_default_queue": "default",
|
||||
"task_default_exchange": "default",
|
||||
"task_default_routing_key": "default",
|
||||
}
|
||||
|
||||
|
||||
class DevelopmentConfig(Config):
|
||||
|
|
|
|||
|
|
@ -6,4 +6,5 @@ Flask-JWT-Extended==4.5.3
|
|||
psycopg2-binary==2.9.9
|
||||
python-dotenv==1.0.0
|
||||
Werkzeug==3.0.1
|
||||
SQLAlchemy==2.0.23
|
||||
SQLAlchemy==2.0.23
|
||||
celery[redis]==5.3.6
|
||||
|
|
|
|||
|
|
@ -7,4 +7,7 @@ black==23.12.1
|
|||
flake8==7.0.0
|
||||
isort==5.13.2
|
||||
mypy==1.7.1
|
||||
faker==20.1.0
|
||||
faker==20.1.0
|
||||
|
||||
# Celery monitoring
|
||||
flower==2.0.1
|
||||
|
|
|
|||
|
|
@ -55,6 +55,78 @@ services:
|
|||
- crafting-shop-network
|
||||
restart: unless-stopped
|
||||
|
||||
celery_worker:
|
||||
build:
|
||||
context: ./backend
|
||||
dockerfile: Dockerfile
|
||||
container_name: crafting-shop-celery-worker
|
||||
command: celery -A celery_worker worker --loglevel=info --concurrency=4
|
||||
environment:
|
||||
- FLASK_ENV=${FLASK_ENV:-prod}
|
||||
- SECRET_KEY=${SECRET_KEY}
|
||||
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
|
||||
- DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
|
||||
- CELERY_BROKER_URL=redis://redis:6379/0
|
||||
- CELERY_RESULT_BACKEND=redis://redis:6379/0
|
||||
depends_on:
|
||||
- redis
|
||||
- postgres
|
||||
- backend
|
||||
networks:
|
||||
- crafting-shop-network
|
||||
restart: unless-stopped
|
||||
healthcheck:
|
||||
test: ["CMD", "celery", "-A", "celery_worker", "inspect", "ping", "-d", "celery@$$HOSTNAME"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
start_period: 40s
|
||||
|
||||
celery_beat:
|
||||
build:
|
||||
context: ./backend
|
||||
dockerfile: Dockerfile
|
||||
container_name: crafting-shop-celery-beat
|
||||
command: celery -A celery_worker beat --loglevel=info
|
||||
environment:
|
||||
- FLASK_ENV=${FLASK_ENV:-prod}
|
||||
- SECRET_KEY=${SECRET_KEY}
|
||||
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
|
||||
- DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
|
||||
- CELERY_BROKER_URL=redis://redis:6379/0
|
||||
- CELERY_RESULT_BACKEND=redis://redis:6379/0
|
||||
depends_on:
|
||||
- redis
|
||||
- postgres
|
||||
- backend
|
||||
networks:
|
||||
- crafting-shop-network
|
||||
restart: unless-stopped
|
||||
volumes:
|
||||
- celery-beat-data:/app/celerybeat
|
||||
|
||||
flower:
|
||||
build:
|
||||
context: ./backend
|
||||
dockerfile: Dockerfile
|
||||
container_name: crafting-shop-flower
|
||||
command: celery -A celery_worker flower --port=5555
|
||||
ports:
|
||||
- "5555:5555"
|
||||
environment:
|
||||
- FLASK_ENV=${FLASK_ENV:-prod}
|
||||
- SECRET_KEY=${SECRET_KEY}
|
||||
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
|
||||
- DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}
|
||||
- CELERY_BROKER_URL=redis://redis:6379/0
|
||||
- CELERY_RESULT_BACKEND=redis://redis:6379/0
|
||||
depends_on:
|
||||
- redis
|
||||
- celery_worker
|
||||
networks:
|
||||
- crafting-shop-network
|
||||
restart: unless-stopped
|
||||
|
||||
prometheus:
|
||||
image: prom/prometheus:latest
|
||||
container_name: crafting-shop-prometheus
|
||||
|
|
@ -90,6 +162,7 @@ volumes:
|
|||
prometheus-data:
|
||||
grafana-data:
|
||||
backend-data:
|
||||
celery-beat-data:
|
||||
|
||||
networks:
|
||||
crafting-shop-network:
|
||||
|
|
|
|||
Loading…
Reference in a new issue