from __future__ import annotations from dotenv import load_dotenv load_dotenv('/opt/rinet-gpu/.env.master') # auto-added by patch_scrapers_with_dotenv.sh """ audit_seal_router.py — HTTP surface for sys_audit log + Polygon PoS sealing Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com Date: 2026-05-04 (R3) / 2026-05-05 (R4 — log+stats+helper) Endpoints (all under /sport/api): GET /audit/log?limit=200&action=&resource=&q=&user= Tail of pgz_sport.sys_audit with optional filters. Returns: { count, items: [{id, user_id, user_email, action, resource_type, resource_id, details, tx_hash, created_at}], total } Aliases: target_type/resource_type → resource for legacy frontend. GET /audit/stats { total, today, sealed, users } POST /audit/seal body: { action: "sufinanciranje.approved", ref_type: "sufinanciranje", (optional) ref_id: "2026-001", (string or number) payload: { ... }, (sha256 computed server-side) data_hash: "abc..." (optional — if you already have the hash) } returns the seal record (seal_id, tx_hash, polygonscan_url, status, ...). GET /audit/seal/list?action=&ref_type=&ref_id=&limit= Recent seals for the audit-log UI. GET /audit/seal/{seal_id} Single seal with on-chain receipt cross-check (if web3 wired up). The legacy hash-chain audit endpoints (/api/admin/audit-chain*) live in pgz_sport_api.py and remain unchanged. ---------------------------------------------------------------------- audit_log() — shared helper for other routers (cc2/4/5/6) from audit_seal_router import audit_log audit_log(action='users.update', target_type='users', target_id=7, payload={'changed':['email']}, user_id=u['id'], user_email=u['email']) Fail-soft: never raises, only writes to stderr on error. """ import sys, os, json, traceback from datetime import date, datetime from typing import Any, Optional import psycopg2, psycopg2.extras from fastapi import APIRouter, Body, HTTPException, Header, Query # blockchain.seal lives at /opt/pgz-sport/blockchain/seal.py sys.path.insert(0, '/opt/pgz-sport') from blockchain import seal as seal_mod # noqa: E402 router = APIRouter() # ── DB helper (own connection, mirrors enrich_router DSN logic) ────────── _pgh = os.environ.get('PG_HOST', '10.10.0.2') _pgp = int(os.environ.get('PG_PORT', '6432')) if _pgh in ('localhost', '127.0.0.1'): _pgh = os.environ.get('DB_HOST', '10.10.0.2') _pgp = int(os.environ.get('DB_PORT', '6432')) _DB = dict(host=_pgh, port=_pgp, dbname=os.environ.get('PG_DB', 'rinet_v3'), user=os.environ.get('PG_USER', 'rinet'), password=os.environ["DB_PASSWORD"]) def _conn(): c = psycopg2.connect(**_DB); c.autocommit = True; return c def audit_log(action: str, target_type: Optional[str] = None, target_id: Optional[int] = None, target_text: Optional[str] = None, payload: Optional[dict] = None, user_id: Optional[int] = None, user_email: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Optional[int]: """Insert one row into pgz_sport.sys_audit. Fail-soft. Other routers should call this after any successful DB mutation. The DB trigger trg_audit_chain populates row_hash + chain_idx automatically. Returns the new row id, or None on failure. """ if not action: return None try: with _conn() as c, c.cursor() as cur: cur.execute(""" INSERT INTO pgz_sport.sys_audit (user_id, user_email, action, target_type, target_id, target_text, payload, ip_address, user_agent) VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s) RETURNING id """, ( user_id, user_email, action[:100], (target_type or None) and str(target_type)[:50], target_id, target_text, json.dumps(payload, default=str, ensure_ascii=False) if payload is not None else None, ip_address, user_agent, )) row = cur.fetchone() return row[0] if row else None except Exception: # Never block business logic on audit failure — log and move on. sys.stderr.write('[audit_log] insert failed for action='+str(action)+'\n') sys.stderr.write(traceback.format_exc()) return None def _row_to_item(r: dict) -> dict: """Normalise a sys_audit row for the audit.html UI.""" payload = r.get('payload') or {} if isinstance(payload, str): try: payload = json.loads(payload) except: pass out = { 'id': r.get('id'), 'user_id': r.get('user_id'), 'user_email': r.get('user_email'), 'action': r.get('action'), 'resource_type': r.get('target_type'), 'resource_id': r.get('target_id'), 'target_text': r.get('target_text'), 'details': payload, 'row_hash': r.get('row_hash'), 'chain_idx': r.get('chain_idx'), 'created_at': (r.get('created_at').isoformat() if isinstance(r.get('created_at'), datetime) else r.get('created_at')), } # Surface a polygon tx hash if it was stored in payload (seal_to_polygon does this) if isinstance(payload, dict): out['tx_hash'] = payload.get('tx_hash') or payload.get('polygon_tx') or payload.get('seal_tx_hash') return out # ── R4: audit log + stats endpoints ───────────────────────────────────── @router.get("/audit/log") def audit_log_list(limit: int = Query(200, ge=1, le=1000), action: Optional[str] = None, resource: Optional[str] = None, user: Optional[str] = None, q: Optional[str] = None, since: Optional[str] = None): """Recent audit rows with simple filters. resource matches target_type.""" where = [] params: list = [] if action: where.append("action ILIKE %s"); params.append('%'+action+'%') if resource: where.append("target_type ILIKE %s"); params.append('%'+resource+'%') if user: where.append("(user_email ILIKE %s OR CAST(user_id AS text)=%s)") params.append('%'+user+'%'); params.append(user) if q: where.append("(action ILIKE %s OR target_type ILIKE %s OR target_text ILIKE %s OR user_email ILIKE %s OR payload::text ILIKE %s)") params.extend(['%'+q+'%']*5) if since: where.append("created_at >= %s::timestamptz"); params.append(since) sql = (""" SELECT id, user_id, user_email, action, target_type, target_id, target_text, payload, ip_address, user_agent, row_hash, chain_idx, created_at FROM pgz_sport.sys_audit """ + (" WHERE "+ ' AND '.join(where) if where else '') + """ ORDER BY id DESC LIMIT %s """) params.append(limit) with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(sql, params) rows = [_row_to_item(dict(r)) for r in cur.fetchall()] cur.execute("SELECT count(*) AS n FROM pgz_sport.sys_audit") total = cur.fetchone()['n'] return {'count': len(rows), 'total': total, 'items': rows} @router.get("/audit/stats") def audit_stats(): with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute(""" SELECT (SELECT count(*) FROM pgz_sport.sys_audit) AS total, (SELECT count(*) FROM pgz_sport.sys_audit WHERE created_at::date = current_date) AS today, (SELECT count(DISTINCT user_email) FROM pgz_sport.sys_audit WHERE user_email IS NOT NULL AND created_at >= now()-interval '30 days') AS users, (SELECT count(*) FROM pgz_sport.sys_audit WHERE payload ? 'tx_hash' OR payload ? 'polygon_tx' OR payload ? 'seal_tx_hash') AS sealed """) r = cur.fetchone() or {} return { 'total': int(r.get('total') or 0), 'today': int(r.get('today') or 0), 'sealed': int(r.get('sealed') or 0), 'users': int(r.get('users') or 0), } @router.post("/audit/seal") def audit_seal(body: dict = Body(...), x_user_email: Optional[str] = Header(default=None), x_user_id: Optional[int] = Header(default=None)): """Seal an audit event to Polygon PoS (or queue for later if no key).""" if not isinstance(body, dict): raise HTTPException(400, "JSON body required") action = (body.get('action') or '').strip() if not action: raise HTTPException(400, "action is required") payload = body.get('payload') data_hash = (body.get('data_hash') or '').strip().lower().lstrip('0x') if not data_hash: if payload is None: raise HTTPException(400, "either data_hash or payload required") data_hash = seal_mod.hash_payload(payload) ref_id = body.get('ref_id') if ref_id is None: raise HTTPException(400, "ref_id is required") ref_type = body.get('ref_type') try: result = seal_mod.seal_to_polygon( data_hash=data_hash, ref_id=str(ref_id), action=action, ref_type=ref_type, payload=payload, user_id=x_user_id, user_email=x_user_email, ) except ValueError as e: raise HTTPException(400, str(e)) return result @router.get("/audit/seal/list") def audit_seal_list(action: Optional[str] = None, ref_type: Optional[str] = None, ref_id: Optional[str] = None, limit: int = 50): rows = seal_mod.list_seals(action=action, ref_type=ref_type, ref_id=ref_id, limit=limit) return {'count': len(rows), 'rows': rows, 'wallet': seal_mod.POLYGON_WALLET, 'chain_id': seal_mod.POLYGON_CHAIN_ID, 'live': seal_mod.HAS_WEB3 and bool(seal_mod.POLYGON_PRIVKEY)} @router.get("/audit/seal/{seal_id}") def audit_seal_get(seal_id: str): row = seal_mod.verify_seal(seal_id) if not row: raise HTTPException(404, "seal not found") return row