From b4387dd808d9aad8cb2145919583488528865cc8 Mon Sep 17 00:00:00 2001 From: david Date: Sat, 21 Feb 2026 21:38:19 +0300 Subject: [PATCH] add celery, worker and scheduler --- .env.example | 6 +- .gitignore | 4 +- Makefile | 32 +++- backend/.env.example | 4 + backend/app/__init__.py | 26 ++- backend/app/celery/__init__.py | 73 ++++++++ backend/app/celery/beat_schedule.py | 99 +++++++++++ backend/app/celery/tasks/__init__.py | 15 ++ backend/app/celery/tasks/example_tasks.py | 198 ++++++++++++++++++++++ backend/app/config.py | 17 ++ backend/app/routes/api.py | 136 ++++++++++++++- backend/celery_worker.py | 23 +++ backend/config.py | 24 +++ backend/requirements/base.txt | 3 +- backend/requirements/dev.txt | 5 +- docker-compose.yml | 73 ++++++++ 16 files changed, 722 insertions(+), 16 deletions(-) create mode 100644 backend/app/celery/__init__.py create mode 100644 backend/app/celery/beat_schedule.py create mode 100644 backend/app/celery/tasks/__init__.py create mode 100644 backend/app/celery/tasks/example_tasks.py create mode 100644 backend/celery_worker.py diff --git a/.env.example b/.env.example index 31e1bef..57b22e0 100644 --- a/.env.example +++ b/.env.example @@ -13,9 +13,13 @@ POSTGRES_DB=crafting_shop GRAFANA_USER=admin GRAFANA_PASSWORD=change-this-password-in-production +# Celery Configuration +CELERY_BROKER_URL=redis://redis:6379/0 +CELERY_RESULT_BACKEND=redis://redis:6379/0 + # Optional: External Services # REDIS_URL=redis://localhost:6379/0 # SMTP_HOST=smtp.gmail.com # SMTP_PORT=587 # SMTP_USER=your-email@gmail.com -# SMTP_PASSWORD=your-smtp-password \ No newline at end of file +# SMTP_PASSWORD=your-smtp-password diff --git a/.gitignore b/.gitignore index 3323fe9..3dbafaf 100644 --- a/.gitignore +++ b/.gitignore @@ -79,4 +79,6 @@ htmlcov/ # Temporary files *.tmp *.temp -.cache/ \ No newline at end of file +.cache/ + +celerybeat-schedule \ No newline at end of file diff --git a/Makefile b/Makefile index 59befb4..3c5aed0 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help install dev-services dev-stop-services dev-backend dev-frontend dev build test lint clean up down restart logs +.PHONY: help install dev-services dev-stop-services dev-backend dev-frontend dev build test lint clean up down restart logs celery-worker celery-beat celery-flower celery-shell help: ## Show this help message @echo "Available commands:" @@ -21,7 +21,7 @@ dev-stop-services: ## Stop development services dev-backend: ## Start backend server locally @echo "Starting backend server..." - cd backend && source venv/bin/activate && flask run + cd backend && . venv/bin/activate && flask run dev-frontend: ## Start frontend server locally @echo "Starting frontend server..." @@ -106,4 +106,30 @@ backup: ## Backup database docker exec crafting-shop-postgres pg_dump -U crafting crafting_shop > backup.sql restore: ## Restore database (usage: make restore FILE=backup.sql) - docker exec -i crafting-shop-postgres psql -U crafting crafting_shop < $(FILE) \ No newline at end of file + docker exec -i crafting-shop-postgres psql -U crafting crafting_shop < $(FILE) + +# Celery Commands +celery-worker: ## Start Celery worker locally + @echo "Starting Celery worker..." + cd backend && . venv/bin/activate && celery -A celery_worker worker --loglevel=info --concurrency=4 + +celery-beat: ## Start Celery Beat scheduler locally + @echo "Starting Celery Beat scheduler..." + cd backend && . venv/bin/activate && celery -A celery_worker beat --loglevel=info + +celery-flower: ## Start Flower monitoring locally + @echo "Starting Flower monitoring..." + cd backend && . venv/bin/activate && celery -A celery_worker flower --port=5555 + +celery-shell: ## Open Celery shell for task inspection + @echo "Opening Celery shell..." + cd backend && . venv/bin/activate && celery -A celery_worker shell + +logs-celery: ## Show Celery worker logs + docker compose logs -f celery_worker + +logs-celery-beat: ## Show Celery Beat logs + docker compose logs -f celery_beat + +logs-flower: ## Show Flower logs + docker compose logs -f flower diff --git a/backend/.env.example b/backend/.env.example index e563d6e..916ff93 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -5,3 +5,7 @@ CORS_ORIGINS=* DEV_DATABASE_URL=postgresql://crafting:devpassword@localhost:5432/crafting_shop DATABASE_URL=postgresql://user:password@localhost/proddb TEST_DATABASE_URL=sqlite:///test.db + +# Celery Configuration +CELERY_BROKER_URL=redis://localhost:6379/0 +CELERY_RESULT_BACKEND=redis://localhost:6379/0 diff --git a/backend/app/__init__.py b/backend/app/__init__.py index c281e5b..abf70d4 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -1,16 +1,17 @@ +import json from flask import Flask, jsonify from flask_cors import CORS from flask_jwt_extended import JWTManager from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate import os - +from dotenv import load_dotenv # Create extensions but don't initialize them yet db = SQLAlchemy() migrate = Migrate() jwt = JWTManager() cors = CORS() - +load_dotenv(override=True) def create_app(config_name=None): """Application factory pattern""" @@ -18,32 +19,41 @@ def create_app(config_name=None): # Load configuration if config_name is None: - config_name = os.environ.get('FLASK_ENV', 'development') + config_name = os.environ.get("FLASK_ENV", "development") from app.config import config_by_name app.config.from_object(config_by_name[config_name]) + print('----------------------------------------------------------') + print(F'------------------ENVIRONMENT: {config_name}-------------------------------------') + # print(F'------------------CONFIG: {app.config}-------------------------------------') + # print(json.dumps(dict(app.config), indent=2, default=str)) + print('----------------------------------------------------------') # Initialize extensions with app db.init_app(app) migrate.init_app(app, db) jwt.init_app(app) - cors.init_app(app, resources={r"/api/*": {"origins": app.config.get('CORS_ORIGINS', '*')}}) + cors.init_app(app, resources={r"/api/*": {"origins": app.config.get("CORS_ORIGINS", "*")}}) + + # Initialize Celery + from app.celery import init_celery + init_celery(app) # Import models (required for migrations) from app.models import user, product, order # Register blueprints from app.routes import api_bp, health_bp - app.register_blueprint(api_bp, url_prefix='/api') + app.register_blueprint(api_bp, url_prefix="/api") app.register_blueprint(health_bp) # Global error handlers @app.errorhandler(404) def not_found(error): - return jsonify({'error': 'Not found'}), 404 + return jsonify({"error": "Not found"}), 404 @app.errorhandler(500) def internal_error(error): - return jsonify({'error': 'Internal server error'}), 500 + return jsonify({"error": "Internal server error"}), 500 - return app \ No newline at end of file + return app diff --git a/backend/app/celery/__init__.py b/backend/app/celery/__init__.py new file mode 100644 index 0000000..7e4589a --- /dev/null +++ b/backend/app/celery/__init__.py @@ -0,0 +1,73 @@ +""" +Celery application factory for the Crafting Shop application. +Follows the same application factory pattern as Flask. +""" +from celery import Celery +from flask import Flask + + +def make_celery(app: Flask) -> Celery: + """ + Create and configure a Celery application with Flask context. + + Args: + app: Flask application instance + + Returns: + Configured Celery application instance. + """ + # Create Celery app with Flask app name + celery_app = Celery( + app.import_name, + broker=app.config["CELERY"]["broker_url"], + backend=app.config["CELERY"]["result_backend"] + ) + + # Update configuration from Flask config + celery_app.conf.update(app.config["CELERY"]) + + # Set up Flask application context for tasks + # This ensures tasks have access to Flask extensions (db, etc.) + class ContextTask(celery_app.Task): + """Celery task that runs within Flask application context.""" + def __call__(self, *args, **kwargs): + with app.app_context(): + return self.run(*args, **kwargs) + + celery_app.Task = ContextTask + + # Auto-discover tasks in the tasks module + celery_app.autodiscover_tasks(['app.celery.tasks']) + + # Configure Beat schedule + from .beat_schedule import configure_beat_schedule + configure_beat_schedule(celery_app) + + # Import tasks to ensure they're registered + from .tasks import example_tasks + + print(f"✅ Celery configured with broker: {celery_app.conf.broker_url}") + print(f"✅ Celery configured with backend: {celery_app.conf.result_backend}") + print(f"✅ Beat schedule configured with {len(celery_app.conf.beat_schedule)} tasks") + + return celery_app + + +# Global Celery instance +celery = None + + +def init_celery(app: Flask) -> Celery: + """ + Initialize the global celery instance with Flask app. + This should be called in create_app() after Flask app is created. + + Args: + app: Flask application instance + + Returns: + Configured Celery application instance + """ + global celery + celery = make_celery(app) + return celery diff --git a/backend/app/celery/beat_schedule.py b/backend/app/celery/beat_schedule.py new file mode 100644 index 0000000..620bbc4 --- /dev/null +++ b/backend/app/celery/beat_schedule.py @@ -0,0 +1,99 @@ +""" +Celery Beat schedule configuration for periodic tasks. +This defines when scheduled tasks should run. +""" +from celery.schedules import crontab + + +# Celery Beat schedule configuration +beat_schedule = { + # Run every minute (for testing/demo) + "print-hello-every-minute": { + "task": "tasks.print_hello", + "schedule": crontab(minute="*"), # Every minute + "args": ("Celery Beat",), + "options": {"queue": "default"}, + }, + + # Run daily at 9:00 AM + "send-daily-report": { + "task": "tasks.send_daily_report", + "schedule": crontab(hour=9, minute=0), # 9:00 AM daily + "options": {"queue": "reports"}, + }, + + # Run every hour at minute 0 + "update-product-stats-hourly": { + "task": "tasks.update_product_statistics", + "schedule": crontab(minute=0), # Every hour at minute 0 + "args": (None,), # Update all products + "options": {"queue": "stats"}, + }, + + # Run every Monday at 8:00 AM + "weekly-maintenance": { + "task": "tasks.long_running_task", + "schedule": crontab(hour=8, minute=0, day_of_week=1), # Monday 8:00 AM + "args": (5,), # 5 iterations + "options": {"queue": "maintenance"}, + }, + + # Run every 5 minutes (for monitoring/heartbeat) + "heartbeat-check": { + "task": "tasks.print_hello", + "schedule": 300.0, # Every 300 seconds (5 minutes) + "args": ("Heartbeat",), + "options": {"queue": "monitoring"}, + }, +} + + +def configure_beat_schedule(celery_app): + """ + Configure Celery Beat schedule on the Celery app. + + Args: + celery_app: Celery application instance + """ + celery_app.conf.beat_schedule = beat_schedule + + # Configure timezone + celery_app.conf.timezone = "UTC" + celery_app.conf.enable_utc = True + + # Configure task routes for scheduled tasks + celery_app.conf.task_routes = { + "tasks.print_hello": {"queue": "default"}, + "tasks.send_daily_report": {"queue": "reports"}, + "tasks.update_product_statistics": {"queue": "stats"}, + "tasks.long_running_task": {"queue": "maintenance"}, + } + + # Configure queues + celery_app.conf.task_queues = { + "default": { + "exchange": "default", + "exchange_type": "direct", + "routing_key": "default", + }, + "reports": { + "exchange": "reports", + "exchange_type": "direct", + "routing_key": "reports", + }, + "stats": { + "exchange": "stats", + "exchange_type": "direct", + "routing_key": "stats", + }, + "maintenance": { + "exchange": "maintenance", + "exchange_type": "direct", + "routing_key": "maintenance", + }, + "monitoring": { + "exchange": "monitoring", + "exchange_type": "direct", + "routing_key": "monitoring", + }, + } \ No newline at end of file diff --git a/backend/app/celery/tasks/__init__.py b/backend/app/celery/tasks/__init__.py new file mode 100644 index 0000000..febf25b --- /dev/null +++ b/backend/app/celery/tasks/__init__.py @@ -0,0 +1,15 @@ +""" +Celery tasks for the Crafting Shop application. +Tasks are organized by domain/functionality. +""" + +# Import all task modules here to ensure they're registered with Celery +from . import example_tasks + +# Re-export tasks for easier imports +from .example_tasks import ( + print_hello, + divide_numbers, + send_daily_report, + update_product_statistics, +) \ No newline at end of file diff --git a/backend/app/celery/tasks/example_tasks.py b/backend/app/celery/tasks/example_tasks.py new file mode 100644 index 0000000..30bb841 --- /dev/null +++ b/backend/app/celery/tasks/example_tasks.py @@ -0,0 +1,198 @@ +""" +Example Celery tasks for the Crafting Shop application. +These tasks demonstrate various Celery features and best practices. +""" +import time +import logging +from datetime import datetime +from celery import shared_task +from celery.exceptions import MaxRetriesExceededError + +# Get logger for this module +logger = logging.getLogger(__name__) + + +@shared_task(bind=True, name="tasks.print_hello") +def print_hello(self, name: str = "World") -> str: + """ + Simple task that prints a greeting. + + Args: + name: Name to greet (default: "World") + + Returns: + Greeting message + """ + message = f"Hello {name} from Celery! Task ID: {self.request.id}" + print(message) + logger.info(message) + return message + + +@shared_task( + bind=True, + name="tasks.divide_numbers", + autoretry_for=(ZeroDivisionError,), + retry_backoff=True, + retry_backoff_max=60, + retry_jitter=True, + max_retries=3 +) +def divide_numbers(self, x: float, y: float) -> float: + """ + Task that demonstrates error handling and retry logic. + + Args: + x: Numerator + y: Denominator + + Returns: + Result of division + + Raises: + ZeroDivisionError: If y is zero (will trigger retry) + """ + logger.info(f"Dividing {x} by {y} (attempt {self.request.retries + 1})") + + if y == 0: + logger.warning(f"Division by zero detected, retrying...") + raise ZeroDivisionError("Cannot divide by zero") + + result = x / y + logger.info(f"Division result: {result}") + return result + + +@shared_task( + bind=True, + name="tasks.send_daily_report", + ignore_result=False +) +def send_daily_report(self) -> dict: + """ + Simulates sending a daily report. + This task would typically send emails, generate reports, etc. + + Returns: + Dictionary with report details + """ + logger.info("Starting daily report generation...") + + # Simulate some work + time.sleep(2) + + report_data = { + "date": datetime.now().isoformat(), + "task_id": self.request.id, + "report_type": "daily", + "status": "generated", + "metrics": { + "total_products": 150, + "total_orders": 42, + "total_users": 89, + "revenue": 12500.75 + } + } + + logger.info(f"Daily report generated: {report_data}") + print(f"📊 Daily Report Generated at {report_data['date']}") + + return report_data + + +@shared_task( + bind=True, + name="tasks.update_product_statistics", + queue="stats", + priority=5 +) +def update_product_statistics(self, product_id: int = None) -> dict: + """ + Simulates updating product statistics. + Demonstrates task routing to a specific queue. + + Args: + product_id: Optional specific product ID to update. + If None, updates all products. + + Returns: + Dictionary with update results + """ + logger.info(f"Updating product statistics for product_id={product_id}") + + # Simulate database work + time.sleep(1) + + if product_id is None: + # Update all products + result = { + "task": "update_all_product_stats", + "status": "completed", + "products_updated": 150, + "timestamp": datetime.now().isoformat() + } + else: + # Update specific product + result = { + "task": "update_single_product_stats", + "product_id": product_id, + "status": "completed", + "timestamp": datetime.now().isoformat(), + "new_stats": { + "views": 125, + "purchases": 15, + "rating": 4.5 + } + } + + logger.info(f"Product statistics updated: {result}") + return result + + +@shared_task( + bind=True, + name="tasks.long_running_task", + time_limit=300, # 5 minutes + soft_time_limit=240 # 4 minutes +) +def long_running_task(self, iterations: int = 10) -> dict: + """ + Simulates a long-running task with progress tracking. + + Args: + iterations: Number of iterations to simulate + + Returns: + Dictionary with results + """ + logger.info(f"Starting long-running task with {iterations} iterations") + + results = [] + for i in range(iterations): + # Check if task has been revoked + if self.is_aborted(): + logger.warning("Task was aborted") + return {"status": "aborted", "completed_iterations": i} + + # Simulate work + time.sleep(1) + + # Update progress + progress = (i + 1) / iterations * 100 + self.update_state( + state="PROGRESS", + meta={"current": i + 1, "total": iterations, "progress": progress} + ) + + results.append(f"iteration_{i + 1}") + logger.info(f"Completed iteration {i + 1}/{iterations}") + + final_result = { + "status": "completed", + "iterations": iterations, + "results": results, + "completed_at": datetime.now().isoformat() + } + + logger.info(f"Long-running task completed: {final_result}") + return final_result \ No newline at end of file diff --git a/backend/app/config.py b/backend/app/config.py index f24c1b6..fc3de1b 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -10,6 +10,23 @@ class Config: JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1) JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30) CORS_ORIGINS = os.environ.get("CORS_ORIGINS", "*") + + # Celery Configuration + CELERY = { + "broker_url": os.environ.get("CELERY_BROKER_URL", "redis://redis:6379/0"), + "result_backend": os.environ.get("CELERY_RESULT_BACKEND", "redis://redis:6379/0"), + "task_serializer": "json", + "result_serializer": "json", + "accept_content": ["json"], + "timezone": "UTC", + "enable_utc": True, + "task_track_started": True, + "task_time_limit": 30 * 60, # 30 minutes + "task_soft_time_limit": 25 * 60, # 25 minutes + "worker_prefetch_multiplier": 1, + "worker_max_tasks_per_child": 100, + "broker_connection_retry_on_startup": True, + } class DevelopmentConfig(Config): diff --git a/backend/app/routes/api.py b/backend/app/routes/api.py index d6b06de..bfa1cda 100644 --- a/backend/app/routes/api.py +++ b/backend/app/routes/api.py @@ -2,6 +2,7 @@ from flask import Blueprint, request, jsonify from flask_jwt_extended import jwt_required, get_jwt_identity, create_access_token, create_refresh_token from app import db from app.models import User, Product, OrderItem, Order +from app.celery import celery api_bp = Blueprint("api", __name__) @@ -231,4 +232,137 @@ def get_order(order_id): if not user or not user.is_admin: return jsonify({"error": "Access denied"}), 403 - return jsonify(order.to_dict()), 200 \ No newline at end of file + return jsonify(order.to_dict()), 200 + + +# Celery Task Routes +@api_bp.route("/tasks/hello", methods=["POST"]) +@jwt_required() +def trigger_hello_task(): + """Trigger the hello task""" + data = request.get_json() or {} + name = data.get("name", "World") + + task = celery.send_task("tasks.print_hello", args=[name]) + + return jsonify({ + "message": "Hello task triggered", + "task_id": task.id, + "status": "pending" + }), 202 + + +@api_bp.route("/tasks/divide", methods=["POST"]) +@jwt_required() +def trigger_divide_task(): + """Trigger the divide numbers task""" + data = request.get_json() or {} + x = data.get("x", 10) + y = data.get("y", 2) + + task = celery.send_task("tasks.divide_numbers", args=[x, y]) + + return jsonify({ + "message": "Divide task triggered", + "task_id": task.id, + "operation": f"{x} / {y}", + "status": "pending" + }), 202 + + +@api_bp.route("/tasks/report", methods=["POST"]) +@jwt_required() +def trigger_report_task(): + """Trigger the daily report task""" + task = celery.send_task("tasks.send_daily_report") + + return jsonify({ + "message": "Daily report task triggered", + "task_id": task.id, + "status": "pending" + }), 202 + + +@api_bp.route("/tasks/stats", methods=["POST"]) +@jwt_required() +def trigger_stats_task(): + """Trigger product statistics update task""" + data = request.get_json() or {} + product_id = data.get("product_id") + + if product_id: + task = celery.send_task("tasks.update_product_statistics", args=[product_id]) + message = f"Product statistics update triggered for product {product_id}" + else: + task = celery.send_task("tasks.update_product_statistics", args=[None]) + message = "Product statistics update triggered for all products" + + return jsonify({ + "message": message, + "task_id": task.id, + "status": "pending" + }), 202 + + +@api_bp.route("/tasks/long-running", methods=["POST"]) +@jwt_required() +def trigger_long_running_task(): + """Trigger a long-running task""" + data = request.get_json() or {} + iterations = data.get("iterations", 10) + + task = celery.send_task("tasks.long_running_task", args=[iterations]) + + return jsonify({ + "message": f"Long-running task triggered with {iterations} iterations", + "task_id": task.id, + "status": "pending" + }), 202 + + +@api_bp.route("/tasks/", methods=["GET"]) +@jwt_required() +def get_task_status(task_id): + """Get the status of a Celery task""" + task_result = celery.AsyncResult(task_id) + + response = { + "task_id": task_id, + "status": task_result.status, + "ready": task_result.ready() + } + + if task_result.ready(): + if task_result.successful(): + response["result"] = task_result.result + else: + response["error"] = str(task_result.result) + response["traceback"] = task_result.traceback + + return jsonify(response), 200 + + +@api_bp.route("/tasks/health", methods=["GET"]) +def celery_health(): + """Check Celery health""" + try: + # Try to ping the worker + inspector = celery.control.inspect() + stats = inspector.stats() + + if stats: + return jsonify({ + "status": "healthy", + "workers": len(stats), + "workers_info": stats + }), 200 + else: + return jsonify({ + "status": "unhealthy", + "message": "No workers available" + }), 503 + except Exception as e: + return jsonify({ + "status": "error", + "message": str(e) + }), 500 diff --git a/backend/celery_worker.py b/backend/celery_worker.py new file mode 100644 index 0000000..4cb9ca7 --- /dev/null +++ b/backend/celery_worker.py @@ -0,0 +1,23 @@ +""" +Celery worker entry point. +This script is used to start Celery workers and provides the proper Flask context. +""" +import os + +# Create Flask app (don't export it at module level to avoid conflicts with Celery) +from app import create_app + +_flask_app = create_app(os.getenv("FLASK_ENV", "development")) + +# Import and initialize Celery with Flask app context +from app.celery import celery + +# Celery is now configured and ready with Flask app context +# Workers will use this instance and have access to Flask extensions (db, etc.) + +if __name__ == "__main__": + # This allows running the worker directly if needed + print("Celery worker entry point initialized") + print(f"Flask app created: {_flask_app.name}") + print(f"Celery broker: {celery.conf.broker_url}") + print(f"Celery backend: {celery.conf.result_backend}") diff --git a/backend/config.py b/backend/config.py index b77ad3d..44d372f 100644 --- a/backend/config.py +++ b/backend/config.py @@ -9,6 +9,30 @@ class Config: JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key-change-in-production' JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1) JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30) + + # Celery Configuration + CELERY = { + "broker_url": os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0"), + "result_backend": os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0"), + "task_serializer": "json", + "result_serializer": "json", + "accept_content": ["json"], + "timezone": "UTC", + "enable_utc": True, + "task_track_started": True, + "task_time_limit": 30 * 60, # 30 minutes + "task_soft_time_limit": 25 * 60, # 25 minutes + "task_acks_late": True, # Acknowledge after task completion + "task_reject_on_worker_lost": True, # Re-queue if worker dies + "worker_prefetch_multiplier": 1, # Process one task at a time + "worker_max_tasks_per_child": 100, # Restart worker after 100 tasks + "broker_connection_retry_on_startup": True, + "broker_connection_max_retries": 5, + "result_expires": 3600, # Results expire in 1 hour + "task_default_queue": "default", + "task_default_exchange": "default", + "task_default_routing_key": "default", + } class DevelopmentConfig(Config): diff --git a/backend/requirements/base.txt b/backend/requirements/base.txt index 795eb62..fa59b36 100644 --- a/backend/requirements/base.txt +++ b/backend/requirements/base.txt @@ -6,4 +6,5 @@ Flask-JWT-Extended==4.5.3 psycopg2-binary==2.9.9 python-dotenv==1.0.0 Werkzeug==3.0.1 -SQLAlchemy==2.0.23 \ No newline at end of file +SQLAlchemy==2.0.23 +celery[redis]==5.3.6 diff --git a/backend/requirements/dev.txt b/backend/requirements/dev.txt index c8f6d4f..af83bc9 100644 --- a/backend/requirements/dev.txt +++ b/backend/requirements/dev.txt @@ -7,4 +7,7 @@ black==23.12.1 flake8==7.0.0 isort==5.13.2 mypy==1.7.1 -faker==20.1.0 \ No newline at end of file +faker==20.1.0 + +# Celery monitoring +flower==2.0.1 diff --git a/docker-compose.yml b/docker-compose.yml index 0115892..018d0cf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -55,6 +55,78 @@ services: - crafting-shop-network restart: unless-stopped + celery_worker: + build: + context: ./backend + dockerfile: Dockerfile + container_name: crafting-shop-celery-worker + command: celery -A celery_worker worker --loglevel=info --concurrency=4 + environment: + - FLASK_ENV=${FLASK_ENV:-prod} + - SECRET_KEY=${SECRET_KEY} + - JWT_SECRET_KEY=${JWT_SECRET_KEY} + - DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB} + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + depends_on: + - redis + - postgres + - backend + networks: + - crafting-shop-network + restart: unless-stopped + healthcheck: + test: ["CMD", "celery", "-A", "celery_worker", "inspect", "ping", "-d", "celery@$$HOSTNAME"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + + celery_beat: + build: + context: ./backend + dockerfile: Dockerfile + container_name: crafting-shop-celery-beat + command: celery -A celery_worker beat --loglevel=info + environment: + - FLASK_ENV=${FLASK_ENV:-prod} + - SECRET_KEY=${SECRET_KEY} + - JWT_SECRET_KEY=${JWT_SECRET_KEY} + - DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB} + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + depends_on: + - redis + - postgres + - backend + networks: + - crafting-shop-network + restart: unless-stopped + volumes: + - celery-beat-data:/app/celerybeat + + flower: + build: + context: ./backend + dockerfile: Dockerfile + container_name: crafting-shop-flower + command: celery -A celery_worker flower --port=5555 + ports: + - "5555:5555" + environment: + - FLASK_ENV=${FLASK_ENV:-prod} + - SECRET_KEY=${SECRET_KEY} + - JWT_SECRET_KEY=${JWT_SECRET_KEY} + - DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB} + - CELERY_BROKER_URL=redis://redis:6379/0 + - CELERY_RESULT_BACKEND=redis://redis:6379/0 + depends_on: + - redis + - celery_worker + networks: + - crafting-shop-network + restart: unless-stopped + prometheus: image: prom/prometheus:latest container_name: crafting-shop-prometheus @@ -90,6 +162,7 @@ volumes: prometheus-data: grafana-data: backend-data: + celery-beat-data: networks: crafting-shop-network: