""" audit_seal_router.py — HTTP surface for sys_audit log + Polygon PoS sealing Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com Date: 2026-05-04 (R3) / 2026-05-05 (R4 — log+stats+helper) Endpoints (all under /sport/api): GET /audit/log?limit=200&action=&resource=&q=&user= Tail of pgz_sport.sys_audit with optional filters. Returns: { count, items: [{id, user_id, user_email, action, resource_type, resource_id, details, tx_hash, created_at}], total } Aliases: target_type/resource_type → resource for legacy frontend. GET /audit/stats { total, today, sealed, users } POST /audit/seal body: { action: "sufinanciranje.approved", ref_type: "sufinanciranje", (optional) ref_id: "2026-001", (string or number) payload: { ... }, (sha256 computed server-side) data_hash: "abc..." (optional — if you already have the hash) } returns the seal record (seal_id, tx_hash, polygonscan_url, status, ...). GET /audit/seal/list?action=&ref_type=&ref_id=&limit= Recent seals for the audit-log UI. GET /audit/seal/{seal_id} Single seal with on-chain receipt cross-check (if web3 wired up). The legacy hash-chain audit endpoints (/api/admin/audit-chain*) live in pgz_sport_api.py and remain unchanged. ---------------------------------------------------------------------- audit_log() — shared helper for other routers (cc2/4/5/6) from audit_seal_router import audit_log audit_log(action='users.update', target_type='users', target_id=7, payload={'changed':['email']}, user_id=u['id'], user_email=u['email']) Fail-soft: never raises, only writes to stderr on error. """ from __future__ import annotations import sys, os, json, traceback from datetime import date, datetime from typing import Any, Optional import psycopg2, psycopg2.extras from fastapi import APIRouter, Body, HTTPException, Header, Query # blockchain.seal lives at /opt/pgz-sport/blockchain/seal.py sys.path.insert(0, '/opt/pgz-sport') from blockchain import seal as seal_mod # noqa: E402 router = APIRouter() # ── DB helper (own connection, mirrors enrich_router DSN logic) ────────── _pgh = os.environ.get('PG_HOST', '10.10.0.2') _pgp = int(os.environ.get('PG_PORT', '6432')) if _pgh in ('localhost', '127.0.0.1'): _pgh = os.environ.get('DB_HOST', '10.10.0.2') _pgp = int(os.environ.get('DB_PORT', '6432')) _DB = dict(host=_pgh, port=_pgp, dbname=os.environ.get('PG_DB', 'rinet_v3'), user=os.environ.get('PG_USER', 'rinet'), password=os.environ.get('PG_PASS', 'R1net2026!SecureDB#v7')) def _conn(): c = psycopg2.connect(**_DB); c.autocommit = True; return c def audit_log(action: str, target_type: Optional[str] = None, target_id: Optional[int] = None, target_text: Optional[str] = None, payload: Optional[dict] = None, user_id: Optional[int] = None, user_email: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[int]: """Insert one row into pgz_sport.sys_audit. Fail-soft. Other routers should call this after any successful DB mutation. The DB trigger trg_audit_chain populates row_hash + chain_idx automatically. Returns the new row id, or None on failure. """ if not action: return None try: with _conn() as c, c.cursor() as cur: cur.execute(""" INSERT INTO pgz_sport.sys_audit (user_id, user_email, action, target_type, target_id, target_text, payload, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s) RETURNING id """, ( user_id, user_email, action[:100], (target_type or None) and str(target_type)[:50], target_id, target_text, json.dumps(payload, default=str, ensure_ascii=False) if payload is not None else None, ip_address, user_agent, )) row = cur.fetchone() return row[0] if row else None except Exception: # Never block business logic on audit failure — log and move on. sys.stderr.write('[audit_log] insert failed for action='+str(action)+'\n') sys.stderr.write(traceback.format_exc()) return None def _row_to_item(r: dict) -> dict: """Normalise a sys_audit row for the audit.html UI.""" payload = r.get('payload') or {} if isinstance(payload, str): try: payload = json.loads(payload) except: pass out = { 'id': r.get('id'), 'user_id': r.get('user_id'), 'user_email': r.get('user_email'), 'action': r.get('action'), 'resource_type': r.get('target_type'), 'resource_id': r.get('target_id'), 'target_text': r.get('target_text'), 'details': payload, 'row_hash': r.get('row_hash'), 'chain_idx': r.get('chain_idx'), 'created_at': (r.get('created_at').isoformat() if isinstance(r.get('created_at'), datetime) else r.get('created_at')), } # Surface a polygon tx hash if it was stored in payload (seal_to_polygon does this) if isinstance(payload, dict): out['tx_hash'] = payload.get('tx_hash') or payload.get('polygon_tx') or payload.get('seal_tx_hash') return out # ── R4: audit log + stats endpoints ───────────────────────────────────── @router.get("/audit/log") def audit_log_list(limit: int = Query(200, ge=1, le=1000), action: Optional[str] = None, resource: Optional[str] = None, user: Optional[str] = None, q: Optional[str] = None, since: Optional[str] = None): """Recent audit rows with simple filters. resource matches target_type.""" where = [] params: list = [] if action: where.append("action ILIKE %s"); params.append('%'+action+'%') if resource: where.append("target_type ILIKE %s"); params.append('%'+resource+'%') if user: where.append("(user_email ILIKE %s OR CAST(user_id AS text)=%s)") params.append('%'+user+'%'); params.append(user) if q: where.append("(action ILIKE %s OR target_type ILIKE %s OR target_text ILIKE %s OR user_email ILIKE %s OR payload::text ILIKE %s)") params.extend(['%'+q+'%']*5) if since: where.append("created_at >= %s::timestamptz"); params.append(since) sql = (""" SELECT id, user_id, user_email, action, target_type, target_id, target_text, payload, ip_address, user_agent, row_hash, chain_idx, created_at FROM pgz_sport.sys_audit """ + (" WHERE "+ ' AND '.join(where) if where else '') + """ ORDER BY id DESC LIMIT %s """) params.append(limit) with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params) rows = [_row_to_item(dict(r)) for r in cur.fetchall()] cur.execute("SELECT count(*) AS n FROM pgz_sport.sys_audit") total = cur.fetchone()['n'] return {'count': len(rows), 'total': total, 'items': rows} @router.get("/audit/stats") def audit_stats(): with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT (SELECT count(*) FROM pgz_sport.sys_audit) AS total, (SELECT count(*) FROM pgz_sport.sys_audit WHERE created_at::date = current_date) AS today, (SELECT count(DISTINCT user_email) FROM pgz_sport.sys_audit WHERE user_email IS NOT NULL AND created_at >= now()-interval '30 days') AS users, (SELECT count(*) FROM pgz_sport.sys_audit WHERE payload ? 'tx_hash' OR payload ? 'polygon_tx' OR payload ? 'seal_tx_hash') AS sealed """) r = cur.fetchone() or {} return { 'total': int(r.get('total') or 0), 'today': int(r.get('today') or 0), 'sealed': int(r.get('sealed') or 0), 'users': int(r.get('users') or 0), } @router.post("/audit/seal") def audit_seal(body: dict = Body(...), x_user_email: Optional[str] = Header(default=None), x_user_id: Optional[int] = Header(default=None)): """Seal an audit event to Polygon PoS (or queue for later if no key).""" if not isinstance(body, dict): raise HTTPException(400, "JSON body required") action = (body.get('action') or '').strip() if not action: raise HTTPException(400, "action is required") payload = body.get('payload') data_hash = (body.get('data_hash') or '').strip().lower().lstrip('0x') if not data_hash: if payload is None: raise HTTPException(400, "either data_hash or payload required") data_hash = seal_mod.hash_payload(payload) ref_id = body.get('ref_id') if ref_id is None: raise HTTPException(400, "ref_id is required") ref_type = body.get('ref_type') try: result = seal_mod.seal_to_polygon( data_hash=data_hash, ref_id=str(ref_id), action=action, ref_type=ref_type, payload=payload, user_id=x_user_id, user_email=x_user_email, ) except ValueError as e: raise HTTPException(400, str(e)) return result @router.get("/audit/seal/list") def audit_seal_list(action: Optional[str] = None, ref_type: Optional[str] = None, ref_id: Optional[str] = None, limit: int = 50): rows = seal_mod.list_seals(action=action, ref_type=ref_type, ref_id=ref_id, limit=limit) return {'count': len(rows), 'rows': rows, 'wallet': seal_mod.POLYGON_WALLET, 'chain_id': seal_mod.POLYGON_CHAIN_ID, 'live': seal_mod.HAS_WEB3 and bool(seal_mod.POLYGON_PRIVKEY)} @router.get("/audit/seal/{seal_id}") def audit_seal_get(seal_id: str): row = seal_mod.verify_seal(seal_id) if not row: raise HTTPException(404, "seal not found") return row