112 lines
4.6 KiB
Python
112 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# Fajl: routers/_tenant.py | v1.0.0 | 2026-05-09
|
|
# Autor: damir@rinet.one (klub-scope wiring for /api/crm endpoints)
|
|
# Svrha: tenant-aware klub_id scope helper used by clanarine + lijecnicki
|
|
# routers (and any future per-klub list endpoint).
|
|
#
|
|
# Logika scope-a:
|
|
# • super_admin / pgz_* → puni pristup; query ?klub_id=X poštuje se
|
|
# • savez_* → trenutno isto kao pgz_* (TODO: stvarni
|
|
# savez→klub join kad bude potreban)
|
|
# • klub_* → forsiraj user.klub_id + sve iz user_klub_links;
|
|
# ako se traži drugi klub → 403
|
|
# • neautenticirani → backward-compat: poštuj traženi klub_id
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from typing import Optional, List, Dict, Any, Tuple
|
|
|
|
from fastapi import HTTPException
|
|
|
|
sys.path.insert(0, "/opt/pgz-sport")
|
|
from auth.auth_v2 import (
|
|
db_query, KLUB_USER_TYPES, SAVEZ_USER_TYPES, PGZ_USER_TYPES,
|
|
)
|
|
|
|
|
|
def _user_klub_ids(user_id: int) -> List[int]:
|
|
"""Return all klub_ids a user is linked to via pgz_sport.user_klub_links."""
|
|
try:
|
|
rows = db_query(
|
|
"SELECT klub_id FROM pgz_sport.user_klub_links WHERE user_id=%s",
|
|
(user_id,),
|
|
)
|
|
except Exception:
|
|
# Table missing or not yet created — fail open to user.klub_id only.
|
|
return []
|
|
return [int(r["klub_id"]) for r in rows if r.get("klub_id") is not None]
|
|
|
|
|
|
def resolve_klub_scope(user: Optional[Dict[str, Any]],
|
|
requested_klub_id: Optional[int]) -> Dict[str, Any]:
|
|
"""Resolve effective klub-scope for an authenticated (or anonymous) user.
|
|
|
|
Returns one of:
|
|
{"mode": "all"} — no SQL filter applied
|
|
{"mode": "single", "klub_id": <int>} — bind one klub_id
|
|
{"mode": "many", "klub_ids": [<int>]} — IN(...) filter
|
|
{"mode": "deny"} — caller should raise 403
|
|
"""
|
|
rid = int(requested_klub_id) if requested_klub_id else None
|
|
|
|
# Backward-compat: no JWT token → behave like before (respect ?klub_id).
|
|
if user is None:
|
|
return {"mode": "single", "klub_id": rid} if rid else {"mode": "all"}
|
|
|
|
ut = (user.get("user_type") or "").lower()
|
|
|
|
if ut in PGZ_USER_TYPES or ut == "super_admin":
|
|
return {"mode": "single", "klub_id": rid} if rid else {"mode": "all"}
|
|
|
|
if ut in SAVEZ_USER_TYPES:
|
|
# TODO: enforce savez→klub membership when needed.
|
|
return {"mode": "single", "klub_id": rid} if rid else {"mode": "all"}
|
|
|
|
if ut in KLUB_USER_TYPES:
|
|
allowed = set()
|
|
if user.get("klub_id"):
|
|
allowed.add(int(user["klub_id"]))
|
|
for kid in _user_klub_ids(int(user["id"])):
|
|
allowed.add(kid)
|
|
if not allowed:
|
|
return {"mode": "deny"}
|
|
if rid is not None:
|
|
if rid not in allowed:
|
|
return {"mode": "deny"}
|
|
return {"mode": "single", "klub_id": rid}
|
|
if len(allowed) == 1:
|
|
return {"mode": "single", "klub_id": next(iter(allowed))}
|
|
return {"mode": "many", "klub_ids": sorted(allowed)}
|
|
|
|
# Unknown / viewer role — restrictive: only the requested klub, never "all".
|
|
if rid is not None:
|
|
return {"mode": "single", "klub_id": rid}
|
|
return {"mode": "deny"}
|
|
|
|
|
|
def apply_klub_scope_sql(scope: Dict[str, Any],
|
|
column: str = "c.klub_id") -> Tuple[str, List[Any]]:
|
|
"""Translate a scope dict into ``(sql_fragment, args)``.
|
|
|
|
``sql_fragment`` is empty when no filter is needed; otherwise it is a
|
|
single ``column = %s`` or ``column IN (%s, %s, ...)`` predicate ready to
|
|
be joined into the existing WHERE chain.
|
|
|
|
Raises ``HTTPException(403)`` for the ``deny`` mode.
|
|
"""
|
|
mode = scope.get("mode")
|
|
if mode == "deny":
|
|
raise HTTPException(
|
|
403, "Nemate ovlasti za pristup podacima izvan vašeg kluba.")
|
|
if mode == "all":
|
|
return "", []
|
|
if mode == "single":
|
|
return f"{column} = %s", [int(scope["klub_id"])]
|
|
if mode == "many":
|
|
ids = list(scope["klub_ids"])
|
|
placeholders = ",".join(["%s"] * len(ids))
|
|
return f"{column} IN ({placeholders})", [int(x) for x in ids]
|
|
return "", []
|