323 lines
8.7 KiB
Python
323 lines
8.7 KiB
Python
from functools import wraps
|
|
|
|
from flask import abort, g
|
|
from flask_jwt_extended import get_jwt_identity
|
|
|
|
from app.models import (Board, Card, CheckItem, Checklist, Comment,
|
|
FileAttachment, List)
|
|
|
|
|
|
def get_current_user_id():
|
|
"""Helper to consistently get user ID"""
|
|
# Cache in g if you want to avoid decoding JWT multiple times per request
|
|
if not hasattr(g, "jwt_identity"):
|
|
g.jwt_identity = int(get_jwt_identity())
|
|
return g.jwt_identity
|
|
|
|
|
|
def load_board_owned(f):
|
|
"""
|
|
Loads a Board and ensures it belongs to the current user.
|
|
Injects 'board' into the route kwargs.
|
|
Aborts with 404 if not found or not owned.
|
|
"""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
board_id = kwargs.get("board_id")
|
|
|
|
# SECURE QUERY: Filter by ID *and* User ID in the DB
|
|
board = Board.query.filter_by(id=board_id, user_id=user_id).first()
|
|
|
|
if not board:
|
|
abort(404)
|
|
|
|
kwargs["board"] = board
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_card_owned(f):
|
|
"""
|
|
Loads a Card and ensures its Parent Board belongs to the current user.
|
|
Injects 'card' into the route kwargs.
|
|
"""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
card_id = kwargs.get("card_id")
|
|
|
|
# Join Board to check ownership securely in one query
|
|
card = (
|
|
Card.query.join(Board)
|
|
.filter(Card.id == card_id, Board.user_id == user_id)
|
|
.first()
|
|
)
|
|
|
|
if not card:
|
|
abort(404)
|
|
|
|
kwargs["card"] = card
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_list_owned(f):
|
|
"""Loads a List ensuring Parent Board ownership."""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
list_id = kwargs.get("list_id")
|
|
|
|
lst = (
|
|
List.query.join(Board)
|
|
.filter(List.id == list_id, Board.user_id == user_id)
|
|
.first()
|
|
)
|
|
|
|
if not lst:
|
|
abort(404)
|
|
|
|
kwargs["lst"] = lst
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_checklist_owned(f):
|
|
"""Loads a Checklist ensuring Parent Board ownership."""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
checklist_id = kwargs.get("checklist_id")
|
|
|
|
checklist = (
|
|
Checklist.query.join(Card)
|
|
.join(Board)
|
|
.filter(Checklist.id == checklist_id, Board.user_id == user_id)
|
|
.first()
|
|
)
|
|
|
|
if not checklist:
|
|
abort(404)
|
|
|
|
kwargs["checklist"] = checklist
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_check_item_owned(f):
|
|
"""Loads a CheckItem ensuring Parent Board ownership."""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
item_id = kwargs.get("item_id")
|
|
|
|
check_item = (
|
|
CheckItem.query.join(Checklist)
|
|
.join(Card)
|
|
.join(Board)
|
|
.filter(CheckItem.id == item_id, Board.user_id == user_id)
|
|
.first()
|
|
)
|
|
|
|
if not check_item:
|
|
abort(404)
|
|
|
|
kwargs["check_item"] = check_item
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_comment_owned(f):
|
|
"""
|
|
Loads a Comment ensuring the Comment itself belongs to the user.
|
|
(Based on schema where Comment has user_id)
|
|
"""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
comment_id = kwargs.get("comment_id")
|
|
|
|
comment = Comment.query.filter_by(id=comment_id, user_id=user_id).first()
|
|
|
|
if not comment:
|
|
abort(404)
|
|
|
|
kwargs["comment"] = comment
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_file_owned(f):
|
|
"""
|
|
Loads a FileAttachment ensuring it belongs to the user.
|
|
Aborts with 404 if not found or not owned.
|
|
"""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
file_id = kwargs.get("file_id")
|
|
|
|
# Filter by ID and user ID
|
|
attachment = FileAttachment.query.filter_by(
|
|
id=file_id, uploaded_by=user_id
|
|
).first()
|
|
|
|
if not attachment:
|
|
abort(404)
|
|
|
|
kwargs["file"] = attachment
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_file_accessible(f):
|
|
"""
|
|
Loads a FileAttachment ensuring the user can access it.
|
|
User can access if:
|
|
1. They uploaded the file, OR
|
|
2. The file is attached to an entity they own (via Board ownership)
|
|
|
|
Aborts with 404 if not found or not accessible.
|
|
"""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
file_id = kwargs.get("file_id")
|
|
|
|
# Try to find file uploaded by user
|
|
attachment = FileAttachment.query.filter_by(
|
|
id=file_id, uploaded_by=user_id
|
|
).first()
|
|
|
|
# If not found, check if attached to a Card that belongs to user's board
|
|
if not attachment:
|
|
# For Card attachments
|
|
card_attachment = (
|
|
FileAttachment.query.join(
|
|
Card,
|
|
(FileAttachment.attachable_type == "Card")
|
|
& (FileAttachment.attachable_id == Card.id),
|
|
)
|
|
.join(Board)
|
|
.filter(
|
|
FileAttachment.id == file_id,
|
|
Board.user_id == user_id,
|
|
)
|
|
.first()
|
|
)
|
|
if card_attachment:
|
|
attachment = card_attachment
|
|
|
|
# If still not found, check if attached
|
|
# to a Comment that belongs to user's board
|
|
if not attachment:
|
|
# For Comment attachments
|
|
comment_attachment = (
|
|
FileAttachment.query.join(
|
|
Comment,
|
|
(FileAttachment.attachable_type == "Comment")
|
|
& (FileAttachment.attachable_id == Comment.id),
|
|
)
|
|
.join(Card)
|
|
.join(Board)
|
|
.filter(
|
|
FileAttachment.id == file_id,
|
|
Board.user_id == user_id,
|
|
)
|
|
.first()
|
|
)
|
|
if comment_attachment:
|
|
attachment = comment_attachment
|
|
|
|
if not attachment:
|
|
abort(404)
|
|
|
|
kwargs["file"] = attachment
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def load_file_accessible_by_uuid(f):
|
|
"""
|
|
Loads a FileAttachment by UUID ensuring the user can access it.
|
|
User can access if:
|
|
1. They uploaded the file, OR
|
|
2. The file is attached to an entity they own (via Board ownership)
|
|
|
|
Aborts with 404 if not found or not accessible.
|
|
"""
|
|
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user_id = get_current_user_id()
|
|
file_uuid = kwargs.get("file_uuid")
|
|
|
|
# Try to find file uploaded by user
|
|
attachment = FileAttachment.query.filter_by(
|
|
uuid=file_uuid, uploaded_by=user_id
|
|
).first()
|
|
|
|
# If not found, check if attached to a Card that belongs to user's board
|
|
if not attachment:
|
|
# For Card attachments
|
|
card_attachment = (
|
|
FileAttachment.query.join(
|
|
Card,
|
|
(FileAttachment.attachable_type == "Card")
|
|
& (FileAttachment.attachable_id == Card.id),
|
|
)
|
|
.join(Board)
|
|
.filter(
|
|
FileAttachment.uuid == file_uuid,
|
|
Board.user_id == user_id,
|
|
)
|
|
.first()
|
|
)
|
|
if card_attachment:
|
|
attachment = card_attachment
|
|
|
|
# If still not found, check if attached to a
|
|
# Comment that belongs to user's board
|
|
if not attachment:
|
|
# For Comment attachments
|
|
comment_attachment = (
|
|
FileAttachment.query.join(
|
|
Comment,
|
|
(FileAttachment.attachable_type == "Comment")
|
|
& (FileAttachment.attachable_id == Comment.id),
|
|
)
|
|
.join(Card)
|
|
.join(Board)
|
|
.filter(
|
|
FileAttachment.uuid == file_uuid,
|
|
Board.user_id == user_id,
|
|
)
|
|
.first()
|
|
)
|
|
if comment_attachment:
|
|
attachment = comment_attachment
|
|
|
|
if not attachment:
|
|
abort(404)
|
|
|
|
kwargs["file"] = attachment
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|