CC2 R6: middleware-wide JWT, avatar demo mode, mock mailer, login rate limit
#1 JWT middleware extended: - Was: /api/admin/* only - Now: any POST/PUT/PATCH/DELETE under /api/* requires Bearer JWT - Whitelist (still anonymous): /api/auth/login, /refresh, /forgot-password, /password/reset, /reset-password, /setup-password, /google; /api/gdpr/consent; any path ending /avatar - 14 mutating endpoints verified to return 401 without token #2 Avatar upload demo mode (routers/clan_panel_router.py): - Anonymous → returns {demo_mode:true, slika_url:null, message:'Demo mode — slika nije spremljena. Prijavite se za pravu pohranu.'}, no FS write, no DB write - Authenticated (valid JWT, allowed role) → real save as before - Auth check now uses auth.auth_v2.decode_token (proper secret + revocation) instead of the broken local _resolve_role #3 Mock mailer (auth/mailer.py): - send_email writes RFC 822 .eml to /tmp/pgz_mailbox + appends to INDEX.jsonl - send_password_reset, send_invite helpers with HR text + HTML alt - Real SMTP active when PGZ_SMTP_HOST is set (env-driven, off by default) - forgot-password and admin invite both call mailer; audit logs mail status #5 Rate limiting on /api/auth/login: - Per-user: 5 wrong attempts → 5-minute DB-backed lockout (was 5 → 15 min). Configurable via PGZ_LOGIN_LOCK_THRESHOLD/MINUTES. - Per-IP: 10 fails / 5-min sliding window in-memory → HTTP 429 Configurable via PGZ_LOGIN_IP_THRESHOLD/WINDOW_SEC. Successful login clears the IP counter. - Failed attempts respond '(N/5) — račun je zaključan na 5 minuta' - New audit actions: login.ratelimit.ip; login.fail meta now includes fails count, locked, lock_minutes #4 Live test report: 46/46 across 6 demo users — login, JWT gate on 14 mutating endpoints, public path whitelist, demo-mode avatar + real save, forgot-password e-mail to mailbox, no-leak unknown email, 5-fail lockout, 423 during lockout, audit coverage.
This commit is contained in:
@@ -0,0 +1,239 @@
|
||||
#!/usr/bin/env python3
|
||||
# erp/permissions.py — PGŽ Sport ERP RBAC helpers (M5/M6)
|
||||
# Author: Damir Radulić <damir@rinet.one> / dradulic@outlook.com
|
||||
# Date: 2026-05-04
|
||||
# Description: Centralizirane provjere ovlasti za račune i putne naloge.
|
||||
#
|
||||
# Uloge (pgz_sport.roles):
|
||||
# super_admin, pgz_admin, savez_admin, klub_admin, klub_user, clan, viewer
|
||||
#
|
||||
# Korisnik (dict iz auth_v2.get_current_user) ima: id, user_type, klub_id, savez_id.
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Optional, Dict, Any
|
||||
import psycopg2, psycopg2.extras
|
||||
|
||||
DB = dict(host="10.10.0.2", port=6432, dbname="rinet_v3", user="rinet",
|
||||
password="R1net2026!SecureDB#v7")
|
||||
|
||||
|
||||
def _db():
|
||||
c = psycopg2.connect(**DB); c.autocommit = True; return c
|
||||
|
||||
|
||||
# ── role helpers ──────────────────────────────────────────────────────
|
||||
def is_super(user) -> bool:
|
||||
return bool(user) and user.get("user_type") == "super_admin"
|
||||
|
||||
def is_pgz_admin(user) -> bool:
|
||||
return bool(user) and user.get("user_type") in ("super_admin", "pgz_admin")
|
||||
|
||||
def is_savez_admin(user) -> bool:
|
||||
return bool(user) and user.get("user_type") == "savez_admin"
|
||||
|
||||
def is_klub_admin(user) -> bool:
|
||||
return bool(user) and user.get("user_type") == "klub_admin"
|
||||
|
||||
def is_klub_user(user) -> bool:
|
||||
return bool(user) and user.get("user_type") in ("klub_admin", "klub_user")
|
||||
|
||||
|
||||
def klub_savez(klub_id: int) -> Optional[int]:
|
||||
"""Vraća savez_id kojem klub pripada (preko klubovi.savez_id ili user_klub_links)."""
|
||||
if not klub_id: return None
|
||||
with _db() as c:
|
||||
cur = c.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute("SELECT savez_id FROM pgz_sport.klubovi WHERE id=%s", (klub_id,))
|
||||
r = cur.fetchone()
|
||||
return r["savez_id"] if r else None
|
||||
|
||||
|
||||
def user_can_see_klub(user, klub_id: Optional[int]) -> bool:
|
||||
"""Tko može VIDJETI klub: super, pgz, savez (ako klub u savezu), klub_admin/user (ako vlastiti klub)."""
|
||||
if not user or not klub_id:
|
||||
return is_pgz_admin(user)
|
||||
if is_pgz_admin(user):
|
||||
return True
|
||||
if is_klub_user(user):
|
||||
return user.get("klub_id") == klub_id
|
||||
if is_savez_admin(user):
|
||||
return klub_savez(klub_id) == user.get("savez_id")
|
||||
return False
|
||||
|
||||
|
||||
# ── INVOICES ──────────────────────────────────────────────────────────
|
||||
def can_view_invoice(user, invoice: Dict[str, Any]) -> bool:
|
||||
"""Pgž admin vidi sve. Savez admin svoje saveze. Klub admin/user vlastiti klub."""
|
||||
if not invoice: return False
|
||||
if is_pgz_admin(user): return True
|
||||
return user_can_see_klub(user, invoice.get("klub_id"))
|
||||
|
||||
|
||||
def can_edit_invoice(user, invoice: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Edit (izmjena polja, korekcija OCR-a) — samo klub_admin vlastitog kluba ILI pgz_admin.
|
||||
Savez admin može komentirati, ali NE editirati.
|
||||
Plaćeni računi su read-only osim za pgz_admin.
|
||||
"""
|
||||
if not invoice: return False
|
||||
if is_pgz_admin(user): return True
|
||||
if invoice.get("payment_status") in ("paid",):
|
||||
return False
|
||||
if is_klub_admin(user):
|
||||
return user.get("klub_id") == invoice.get("klub_id")
|
||||
return False
|
||||
|
||||
|
||||
def can_pay_invoice(user, invoice: Dict[str, Any]) -> bool:
|
||||
"""Označi kao plaćen — klub_admin vlastitog kluba ili pgz_admin."""
|
||||
if not invoice: return False
|
||||
if is_pgz_admin(user): return True
|
||||
if is_klub_admin(user):
|
||||
return user.get("klub_id") == invoice.get("klub_id")
|
||||
return False
|
||||
|
||||
|
||||
def can_comment_invoice(user, invoice: Dict[str, Any]) -> bool:
|
||||
"""Komentirati može pgz_admin, savez_admin (svog saveza) i klub_admin (svog kluba)."""
|
||||
if not invoice: return False
|
||||
if is_pgz_admin(user): return True
|
||||
if is_savez_admin(user):
|
||||
return klub_savez(invoice.get("klub_id")) == user.get("savez_id")
|
||||
if is_klub_admin(user):
|
||||
return user.get("klub_id") == invoice.get("klub_id")
|
||||
return False
|
||||
|
||||
|
||||
def invoice_actions(user, invoice: Dict[str, Any]) -> Dict[str, bool]:
|
||||
"""UI hint — koji gumbi su dostupni."""
|
||||
return {
|
||||
"view": can_view_invoice(user, invoice),
|
||||
"edit": can_edit_invoice(user, invoice),
|
||||
"pay": can_pay_invoice(user, invoice) and invoice.get("payment_status") != "paid",
|
||||
"comment": can_comment_invoice(user, invoice),
|
||||
"delete": is_pgz_admin(user),
|
||||
}
|
||||
|
||||
|
||||
# ── PUTNI NALOZI ──────────────────────────────────────────────────────
|
||||
def can_view_putni_nalog(user, pn: Dict[str, Any]) -> bool:
|
||||
if not pn: return False
|
||||
if is_pgz_admin(user): return True
|
||||
if is_savez_admin(user):
|
||||
return klub_savez(pn.get("klub_id")) == user.get("savez_id")
|
||||
if is_klub_user(user):
|
||||
if user.get("klub_id") == pn.get("klub_id"):
|
||||
return True
|
||||
# Voditelj vidi svoj
|
||||
return pn.get("user_id") == user.get("id") if user else False
|
||||
|
||||
|
||||
def can_edit_putni_nalog(user, pn: Dict[str, Any]) -> bool:
|
||||
"""Edit dozvoljen samo na statusima draft/odbijen, i samo voditelju ili klub_admin/pgz."""
|
||||
if not pn: return False
|
||||
status = (pn.get("status") or "draft").lower()
|
||||
if status not in ("draft", "odbijen"):
|
||||
return is_pgz_admin(user)
|
||||
if is_pgz_admin(user): return True
|
||||
if is_klub_admin(user):
|
||||
return user.get("klub_id") == pn.get("klub_id")
|
||||
# Voditelj
|
||||
return pn.get("user_id") == user.get("id") if user else False
|
||||
|
||||
|
||||
def can_submit_putni_nalog(user, pn: Dict[str, Any]) -> bool:
|
||||
"""Slanje (draft → poslan) — voditelj ili klub_admin."""
|
||||
if not pn: return False
|
||||
if (pn.get("status") or "draft").lower() not in ("draft",):
|
||||
return False
|
||||
if is_pgz_admin(user): return True
|
||||
if is_klub_admin(user):
|
||||
return user.get("klub_id") == pn.get("klub_id")
|
||||
return pn.get("user_id") == user.get("id") if user else False
|
||||
|
||||
|
||||
def can_approve_putni_nalog(user, pn: Dict[str, Any]) -> bool:
|
||||
"""Odobravanje (poslan → odobren ili odbijen) — klub_admin svog kluba ili pgz_admin."""
|
||||
if not pn: return False
|
||||
if (pn.get("status") or "").lower() not in ("poslan", "submitted", "draft"):
|
||||
return False
|
||||
if is_pgz_admin(user): return True
|
||||
if is_klub_admin(user):
|
||||
return user.get("klub_id") == pn.get("klub_id")
|
||||
return False
|
||||
|
||||
|
||||
def can_pay_putni_nalog(user, pn: Dict[str, Any]) -> bool:
|
||||
"""Isplata (odobren → isplaćen) — klub_admin ili pgz_admin."""
|
||||
if not pn: return False
|
||||
if (pn.get("status") or "").lower() not in ("odobren", "approved", "zatvoren"):
|
||||
return False
|
||||
if is_pgz_admin(user): return True
|
||||
if is_klub_admin(user):
|
||||
return user.get("klub_id") == pn.get("klub_id")
|
||||
return False
|
||||
|
||||
|
||||
def putni_nalog_actions(user, pn: Dict[str, Any]) -> Dict[str, bool]:
|
||||
return {
|
||||
"view": can_view_putni_nalog(user, pn),
|
||||
"edit": can_edit_putni_nalog(user, pn),
|
||||
"submit": can_submit_putni_nalog(user, pn),
|
||||
"approve": can_approve_putni_nalog(user, pn),
|
||||
"reject": can_approve_putni_nalog(user, pn),
|
||||
"pay": can_pay_putni_nalog(user, pn),
|
||||
"delete": is_pgz_admin(user),
|
||||
}
|
||||
|
||||
|
||||
# ── Audit logging helper ──────────────────────────────────────────────
|
||||
def audit_invoice(user, invoice_id: int, op: str, field: Optional[str] = None,
|
||||
old=None, new=None):
|
||||
try:
|
||||
with _db() as c:
|
||||
c.cursor().execute(
|
||||
"""INSERT INTO pgz_sport.audit_log
|
||||
(tablica, operacija, record_id, korisnik, promijenjeno_polje,
|
||||
stara_vrijednost, nova_vrijednost)
|
||||
VALUES ('pgz_sport.invoices', %s, %s, %s, %s, %s, %s)""",
|
||||
(op, invoice_id,
|
||||
(user.get("email") if user else "anon"),
|
||||
field,
|
||||
None if old is None else str(old)[:500],
|
||||
None if new is None else str(new)[:500]),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def audit_putni(user, pn_id: int, op: str, field: Optional[str] = None,
|
||||
old=None, new=None):
|
||||
try:
|
||||
with _db() as c:
|
||||
c.cursor().execute(
|
||||
"""INSERT INTO pgz_sport.audit_log
|
||||
(tablica, operacija, record_id, korisnik, promijenjeno_polje,
|
||||
stara_vrijednost, nova_vrijednost)
|
||||
VALUES ('pgz_sport.expense_reports', %s, %s, %s, %s, %s, %s)""",
|
||||
(op, pn_id,
|
||||
(user.get("email") if user else "anon"),
|
||||
field,
|
||||
None if old is None else str(old)[:500],
|
||||
None if new is None else str(new)[:500]),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def fetch_audit(table: str, record_id: int, limit: int = 50):
|
||||
with _db() as c:
|
||||
cur = c.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute(
|
||||
"""SELECT timestamp, operacija, korisnik, promijenjeno_polje,
|
||||
stara_vrijednost, nova_vrijednost
|
||||
FROM pgz_sport.audit_log
|
||||
WHERE tablica=%s AND record_id=%s
|
||||
ORDER BY timestamp DESC LIMIT %s""",
|
||||
(table, record_id, limit),
|
||||
)
|
||||
return cur.fetchall()
|
||||
Reference in New Issue
Block a user