kanban-app/backend/app/routes/api.py

376 lines
11 KiB
Python
Raw Normal View History

2026-02-24 11:03:23 +00:00
import time
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity, create_access_token, create_refresh_token
from app import db
from app.models import User, Product, OrderItem, Order
2026-02-21 18:38:19 +00:00
from app.celery import celery
api_bp = Blueprint("api", __name__)
# User Routes
@api_bp.route("/auth/register", methods=["POST"])
def register():
"""Register a new user"""
data = request.get_json()
if not data or not data.get("email") or not data.get("password"):
return jsonify({"error": "Email and password are required"}), 400
if User.query.filter_by(email=data["email"]).first():
return jsonify({"error": "Email already exists"}), 400
user = User(
email=data["email"],
username=data.get("username", data["email"].split("@")[0]),
first_name=data.get("first_name"),
last_name=data.get("last_name")
)
user.set_password(data["password"])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
@api_bp.route("/auth/login", methods=["POST"])
def login():
"""Login user"""
data = request.get_json()
if not data or not data.get("email") or not data.get("password"):
return jsonify({"error": "Email and password are required"}), 400
user = User.query.filter_by(email=data["email"]).first()
if not user or not user.check_password(data["password"]):
return jsonify({"error": "Invalid credentials"}), 401
if not user.is_active:
return jsonify({"error": "Account is inactive"}), 401
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
return jsonify({
"user": user.to_dict(),
"access_token": access_token,
"refresh_token": refresh_token
}), 200
@api_bp.route("/users/me", methods=["GET"])
@jwt_required()
def get_current_user():
"""Get current user"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user:
return jsonify({"error": "User not found"}), 404
return jsonify(user.to_dict()), 200
# Product Routes
@api_bp.route("/products", methods=["GET"])
def get_products():
"""Get all products"""
2026-02-24 11:03:23 +00:00
# time.sleep(5) # This adds a 5 second delay
products = Product.query.filter_by(is_active=True).all()
2026-02-24 11:03:23 +00:00
return jsonify([product.to_dict() for product in products]), 200
@api_bp.route("/products/<int:product_id>", methods=["GET"])
def get_product(product_id):
"""Get a single product"""
product = Product.query.get_or_404(product_id)
return jsonify(product.to_dict()), 200
@api_bp.route("/products", methods=["POST"])
@jwt_required()
def create_product():
"""Create a new product (admin only)"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user or not user.is_admin:
return jsonify({"error": "Admin access required"}), 403
data = request.get_json()
product = Product(
name=data["name"],
description=data.get("description"),
price=data["price"],
stock=data.get("stock", 0),
image_url=data.get("image_url"),
category=data.get("category")
)
db.session.add(product)
db.session.commit()
return jsonify(product.to_dict()), 201
@api_bp.route("/products/<int:product_id>", methods=["PUT"])
@jwt_required()
def update_product(product_id):
"""Update a product (admin only)"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user or not user.is_admin:
return jsonify({"error": "Admin access required"}), 403
product = Product.query.get_or_404(product_id)
data = request.get_json()
product.name = data.get("name", product.name)
product.description = data.get("description", product.description)
product.price = data.get("price", product.price)
product.stock = data.get("stock", product.stock)
product.image_url = data.get("image_url", product.image_url)
product.category = data.get("category", product.category)
product.is_active = data.get("is_active", product.is_active)
db.session.commit()
return jsonify(product.to_dict()), 200
@api_bp.route("/products/<int:product_id>", methods=["DELETE"])
@jwt_required()
def delete_product(product_id):
"""Delete a product (admin only)"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user or not user.is_admin:
return jsonify({"error": "Admin access required"}), 403
product = Product.query.get_or_404(product_id)
db.session.delete(product)
db.session.commit()
return jsonify({"message": "Product deleted"}), 200
# Order Routes
@api_bp.route("/orders", methods=["GET"])
@jwt_required()
def get_orders():
"""Get all orders for current user"""
user_id = get_jwt_identity()
orders = Order.query.filter_by(user_id=user_id).all()
return jsonify([order.to_dict() for order in orders]), 200
@api_bp.route("/orders", methods=["POST"])
@jwt_required()
def create_order():
"""Create a new order"""
user_id = get_jwt_identity()
data = request.get_json()
if not data or not data.get("items"):
return jsonify({"error": "Order items are required"}), 400
total_amount = 0
order_items = []
for item_data in data["items"]:
product = Product.query.get(item_data["product_id"])
if not product:
return jsonify({"error": f'Product {item_data["product_id"]} not found'}), 404
if product.stock < item_data["quantity"]:
return jsonify({"error": f'Insufficient stock for {product.name}'}), 400
item_total = product.price * item_data["quantity"]
total_amount += item_total
order_items.append({
"product": product,
"quantity": item_data["quantity"],
"price": product.price
})
order = Order(
user_id=user_id,
total_amount=total_amount,
shipping_address=data.get("shipping_address"),
notes=data.get("notes")
)
db.session.add(order)
db.session.flush()
for item_data in order_items:
order_item = OrderItem(
order_id=order.id,
product_id=item_data["product"].id,
quantity=item_data["quantity"],
price=item_data["price"]
)
item_data["product"].stock -= item_data["quantity"]
db.session.add(order_item)
db.session.commit()
return jsonify(order.to_dict()), 201
@api_bp.route("/orders/<int:order_id>", methods=["GET"])
@jwt_required()
def get_order(order_id):
"""Get a single order"""
user_id = get_jwt_identity()
order = Order.query.get_or_404(order_id)
if order.user_id != user_id:
user = User.query.get(user_id)
if not user or not user.is_admin:
return jsonify({"error": "Access denied"}), 403
2026-02-21 18:38:19 +00:00
return jsonify(order.to_dict()), 200
# Celery Task Routes
@api_bp.route("/tasks/hello", methods=["POST"])
@jwt_required()
def trigger_hello_task():
"""Trigger the hello task"""
data = request.get_json() or {}
name = data.get("name", "World")
task = celery.send_task("tasks.print_hello", args=[name])
return jsonify({
"message": "Hello task triggered",
"task_id": task.id,
"status": "pending"
}), 202
@api_bp.route("/tasks/divide", methods=["POST"])
@jwt_required()
def trigger_divide_task():
"""Trigger the divide numbers task"""
data = request.get_json() or {}
x = data.get("x", 10)
y = data.get("y", 2)
task = celery.send_task("tasks.divide_numbers", args=[x, y])
return jsonify({
"message": "Divide task triggered",
"task_id": task.id,
"operation": f"{x} / {y}",
"status": "pending"
}), 202
@api_bp.route("/tasks/report", methods=["POST"])
@jwt_required()
def trigger_report_task():
"""Trigger the daily report task"""
task = celery.send_task("tasks.send_daily_report")
return jsonify({
"message": "Daily report task triggered",
"task_id": task.id,
"status": "pending"
}), 202
@api_bp.route("/tasks/stats", methods=["POST"])
@jwt_required()
def trigger_stats_task():
"""Trigger product statistics update task"""
data = request.get_json() or {}
product_id = data.get("product_id")
if product_id:
task = celery.send_task("tasks.update_product_statistics", args=[product_id])
message = f"Product statistics update triggered for product {product_id}"
else:
task = celery.send_task("tasks.update_product_statistics", args=[None])
message = "Product statistics update triggered for all products"
return jsonify({
"message": message,
"task_id": task.id,
"status": "pending"
}), 202
@api_bp.route("/tasks/long-running", methods=["POST"])
@jwt_required()
def trigger_long_running_task():
"""Trigger a long-running task"""
data = request.get_json() or {}
iterations = data.get("iterations", 10)
task = celery.send_task("tasks.long_running_task", args=[iterations])
return jsonify({
"message": f"Long-running task triggered with {iterations} iterations",
"task_id": task.id,
"status": "pending"
}), 202
@api_bp.route("/tasks/<task_id>", methods=["GET"])
@jwt_required()
def get_task_status(task_id):
"""Get the status of a Celery task"""
task_result = celery.AsyncResult(task_id)
response = {
"task_id": task_id,
"status": task_result.status,
"ready": task_result.ready()
}
if task_result.ready():
if task_result.successful():
response["result"] = task_result.result
else:
response["error"] = str(task_result.result)
response["traceback"] = task_result.traceback
return jsonify(response), 200
@api_bp.route("/tasks/health", methods=["GET"])
def celery_health():
"""Check Celery health"""
try:
# Try to ping the worker
inspector = celery.control.inspect()
stats = inspector.stats()
if stats:
return jsonify({
"status": "healthy",
"workers": len(stats),
"workers_info": stats
}), 200
else:
return jsonify({
"status": "unhealthy",
"message": "No workers available"
}), 503
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500