Files
pgz-sport/routers/audit_seal_router.py_prije_env_deepseek

261 lines
10 KiB
Plaintext

"""
audit_seal_router.py — HTTP surface for sys_audit log + Polygon PoS sealing
Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com
Date: 2026-05-04 (R3) / 2026-05-05 (R4 — log+stats+helper)
Endpoints (all under /sport/api):
GET /audit/log?limit=200&action=&resource=&q=&user=
Tail of pgz_sport.sys_audit with optional filters. Returns:
{ count, items: [{id, user_id, user_email, action, resource_type, resource_id,
details, tx_hash, created_at}], total }
Aliases: target_type/resource_type → resource for legacy frontend.
GET /audit/stats
{ total, today, sealed, users }
POST /audit/seal
body: {
action: "sufinanciranje.approved",
ref_type: "sufinanciranje", (optional)
ref_id: "2026-001", (string or number)
payload: { ... }, (sha256 computed server-side)
data_hash: "abc..." (optional — if you already have the hash)
}
returns the seal record (seal_id, tx_hash, polygonscan_url, status, ...).
GET /audit/seal/list?action=&ref_type=&ref_id=&limit=
Recent seals for the audit-log UI.
GET /audit/seal/{seal_id}
Single seal with on-chain receipt cross-check (if web3 wired up).
The legacy hash-chain audit endpoints (/api/admin/audit-chain*) live in
pgz_sport_api.py and remain unchanged.
----------------------------------------------------------------------
audit_log() — shared helper for other routers (cc2/4/5/6)
from audit_seal_router import audit_log
audit_log(action='users.update', target_type='users', target_id=7,
payload={'changed':['email']}, user_id=u['id'], user_email=u['email'])
Fail-soft: never raises, only writes to stderr on error.
"""
from __future__ import annotations
import sys, os, json, traceback
from datetime import date, datetime
from typing import Any, Optional
import psycopg2, psycopg2.extras
from fastapi import APIRouter, Body, HTTPException, Header, Query
# blockchain.seal lives at /opt/pgz-sport/blockchain/seal.py
sys.path.insert(0, '/opt/pgz-sport')
from blockchain import seal as seal_mod # noqa: E402
router = APIRouter()
# ── DB helper (own connection, mirrors enrich_router DSN logic) ──────────
_pgh = os.environ.get('PG_HOST', '10.10.0.2')
_pgp = int(os.environ.get('PG_PORT', '6432'))
if _pgh in ('localhost', '127.0.0.1'):
_pgh = os.environ.get('DB_HOST', '10.10.0.2')
_pgp = int(os.environ.get('DB_PORT', '6432'))
_DB = dict(host=_pgh, port=_pgp,
dbname=os.environ.get('PG_DB', 'rinet_v3'),
user=os.environ.get('PG_USER', 'rinet'),
password=os.environ["DB_PASSWORD"])
def _conn():
c = psycopg2.connect(**_DB); c.autocommit = True; return c
def audit_log(action: str,
target_type: Optional[str] = None,
target_id: Optional[int] = None,
target_text: Optional[str] = None,
payload: Optional[dict] = None,
user_id: Optional[int] = None,
user_email: Optional[str] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None) -> Optional[int]:
"""Insert one row into pgz_sport.sys_audit. Fail-soft.
Other routers should call this after any successful DB mutation.
The DB trigger trg_audit_chain populates row_hash + chain_idx automatically.
Returns the new row id, or None on failure.
"""
if not action:
return None
try:
with _conn() as c, c.cursor() as cur:
cur.execute("""
INSERT INTO pgz_sport.sys_audit
(user_id, user_email, action, target_type, target_id, target_text,
payload, ip_address, user_agent)
VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s)
RETURNING id
""", (
user_id,
user_email,
action[:100],
(target_type or None) and str(target_type)[:50],
target_id,
target_text,
json.dumps(payload, default=str, ensure_ascii=False) if payload is not None else None,
ip_address,
user_agent,
))
row = cur.fetchone()
return row[0] if row else None
except Exception:
# Never block business logic on audit failure — log and move on.
sys.stderr.write('[audit_log] insert failed for action='+str(action)+'\n')
sys.stderr.write(traceback.format_exc())
return None
def _row_to_item(r: dict) -> dict:
"""Normalise a sys_audit row for the audit.html UI."""
payload = r.get('payload') or {}
if isinstance(payload, str):
try: payload = json.loads(payload)
except: pass
out = {
'id': r.get('id'),
'user_id': r.get('user_id'),
'user_email': r.get('user_email'),
'action': r.get('action'),
'resource_type': r.get('target_type'),
'resource_id': r.get('target_id'),
'target_text': r.get('target_text'),
'details': payload,
'row_hash': r.get('row_hash'),
'chain_idx': r.get('chain_idx'),
'created_at': (r.get('created_at').isoformat() if isinstance(r.get('created_at'), datetime) else r.get('created_at')),
}
# Surface a polygon tx hash if it was stored in payload (seal_to_polygon does this)
if isinstance(payload, dict):
out['tx_hash'] = payload.get('tx_hash') or payload.get('polygon_tx') or payload.get('seal_tx_hash')
return out
# ── R4: audit log + stats endpoints ─────────────────────────────────────
@router.get("/audit/log")
def audit_log_list(limit: int = Query(200, ge=1, le=1000),
action: Optional[str] = None,
resource: Optional[str] = None,
user: Optional[str] = None,
q: Optional[str] = None,
since: Optional[str] = None):
"""Recent audit rows with simple filters. resource matches target_type."""
where = []
params: list = []
if action:
where.append("action ILIKE %s"); params.append('%'+action+'%')
if resource:
where.append("target_type ILIKE %s"); params.append('%'+resource+'%')
if user:
where.append("(user_email ILIKE %s OR CAST(user_id AS text)=%s)")
params.append('%'+user+'%'); params.append(user)
if q:
where.append("(action ILIKE %s OR target_type ILIKE %s OR target_text ILIKE %s OR user_email ILIKE %s OR payload::text ILIKE %s)")
params.extend(['%'+q+'%']*5)
if since:
where.append("created_at >= %s::timestamptz"); params.append(since)
sql = ("""
SELECT id, user_id, user_email, action, target_type, target_id, target_text,
payload, ip_address, user_agent, row_hash, chain_idx, created_at
FROM pgz_sport.sys_audit
""" + (" WHERE "+ ' AND '.join(where) if where else '') + """
ORDER BY id DESC LIMIT %s
""")
params.append(limit)
with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql, params)
rows = [_row_to_item(dict(r)) for r in cur.fetchall()]
cur.execute("SELECT count(*) AS n FROM pgz_sport.sys_audit")
total = cur.fetchone()['n']
return {'count': len(rows), 'total': total, 'items': rows}
@router.get("/audit/stats")
def audit_stats():
with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""
SELECT
(SELECT count(*) FROM pgz_sport.sys_audit) AS total,
(SELECT count(*) FROM pgz_sport.sys_audit WHERE created_at::date = current_date) AS today,
(SELECT count(DISTINCT user_email) FROM pgz_sport.sys_audit
WHERE user_email IS NOT NULL AND created_at >= now()-interval '30 days') AS users,
(SELECT count(*) FROM pgz_sport.sys_audit
WHERE payload ? 'tx_hash' OR payload ? 'polygon_tx' OR payload ? 'seal_tx_hash')
AS sealed
""")
r = cur.fetchone() or {}
return {
'total': int(r.get('total') or 0),
'today': int(r.get('today') or 0),
'sealed': int(r.get('sealed') or 0),
'users': int(r.get('users') or 0),
}
@router.post("/audit/seal")
def audit_seal(body: dict = Body(...),
x_user_email: Optional[str] = Header(default=None),
x_user_id: Optional[int] = Header(default=None)):
"""Seal an audit event to Polygon PoS (or queue for later if no key)."""
if not isinstance(body, dict):
raise HTTPException(400, "JSON body required")
action = (body.get('action') or '').strip()
if not action:
raise HTTPException(400, "action is required")
payload = body.get('payload')
data_hash = (body.get('data_hash') or '').strip().lower().lstrip('0x')
if not data_hash:
if payload is None:
raise HTTPException(400, "either data_hash or payload required")
data_hash = seal_mod.hash_payload(payload)
ref_id = body.get('ref_id')
if ref_id is None:
raise HTTPException(400, "ref_id is required")
ref_type = body.get('ref_type')
try:
result = seal_mod.seal_to_polygon(
data_hash=data_hash,
ref_id=str(ref_id),
action=action,
ref_type=ref_type,
payload=payload,
user_id=x_user_id,
user_email=x_user_email,
)
except ValueError as e:
raise HTTPException(400, str(e))
return result
@router.get("/audit/seal/list")
def audit_seal_list(action: Optional[str] = None,
ref_type: Optional[str] = None,
ref_id: Optional[str] = None,
limit: int = 50):
rows = seal_mod.list_seals(action=action, ref_type=ref_type,
ref_id=ref_id, limit=limit)
return {'count': len(rows), 'rows': rows,
'wallet': seal_mod.POLYGON_WALLET,
'chain_id': seal_mod.POLYGON_CHAIN_ID,
'live': seal_mod.HAS_WEB3 and bool(seal_mod.POLYGON_PRIVKEY)}
@router.get("/audit/seal/{seal_id}")
def audit_seal_get(seal_id: str):
row = seal_mod.verify_seal(seal_id)
if not row:
raise HTTPException(404, "seal not found")
return row