cf993b0221
- GET /sport/api/audit/log?limit=&action=&resource=&q=&user=&since=
Filters pgz_sport.sys_audit; returns normalised items list + total count.
Aliases target_type → resource_type for the audit.html UI.
Lifts tx_hash from payload.tx_hash / polygon_tx / seal_tx_hash.
- GET /sport/api/audit/stats — {total, today, sealed, users}
sealed counts rows whose payload jsonb has tx_hash key (or polygon_tx).
- audit_log() shared helper for cc2/cc4/cc5/cc6 to call after DB writes.
Fail-soft: never raises, writes traceback to stderr if insert fails.
trg_audit_chain on table fills row_hash + chain_idx automatically.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
261 lines
10 KiB
Python
261 lines
10 KiB
Python
"""
|
|
audit_seal_router.py — HTTP surface for sys_audit log + Polygon PoS sealing
|
|
Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com
|
|
Date: 2026-05-04 (R3) / 2026-05-05 (R4 — log+stats+helper)
|
|
|
|
Endpoints (all under /sport/api):
|
|
|
|
GET /audit/log?limit=200&action=&resource=&q=&user=
|
|
Tail of pgz_sport.sys_audit with optional filters. Returns:
|
|
{ count, items: [{id, user_id, user_email, action, resource_type, resource_id,
|
|
details, tx_hash, created_at}], total }
|
|
Aliases: target_type/resource_type → resource for legacy frontend.
|
|
|
|
GET /audit/stats
|
|
{ total, today, sealed, users }
|
|
|
|
POST /audit/seal
|
|
body: {
|
|
action: "sufinanciranje.approved",
|
|
ref_type: "sufinanciranje", (optional)
|
|
ref_id: "2026-001", (string or number)
|
|
payload: { ... }, (sha256 computed server-side)
|
|
data_hash: "abc..." (optional — if you already have the hash)
|
|
}
|
|
returns the seal record (seal_id, tx_hash, polygonscan_url, status, ...).
|
|
|
|
GET /audit/seal/list?action=&ref_type=&ref_id=&limit=
|
|
Recent seals for the audit-log UI.
|
|
|
|
GET /audit/seal/{seal_id}
|
|
Single seal with on-chain receipt cross-check (if web3 wired up).
|
|
|
|
The legacy hash-chain audit endpoints (/api/admin/audit-chain*) live in
|
|
pgz_sport_api.py and remain unchanged.
|
|
|
|
----------------------------------------------------------------------
|
|
audit_log() — shared helper for other routers (cc2/4/5/6)
|
|
from audit_seal_router import audit_log
|
|
audit_log(action='users.update', target_type='users', target_id=7,
|
|
payload={'changed':['email']}, user_id=u['id'], user_email=u['email'])
|
|
Fail-soft: never raises, only writes to stderr on error.
|
|
"""
|
|
from __future__ import annotations
|
|
import sys, os, json, traceback
|
|
from datetime import date, datetime
|
|
from typing import Any, Optional
|
|
|
|
import psycopg2, psycopg2.extras
|
|
from fastapi import APIRouter, Body, HTTPException, Header, Query
|
|
|
|
# blockchain.seal lives at /opt/pgz-sport/blockchain/seal.py
|
|
sys.path.insert(0, '/opt/pgz-sport')
|
|
from blockchain import seal as seal_mod # noqa: E402
|
|
|
|
router = APIRouter()
|
|
|
|
# ── DB helper (own connection, mirrors enrich_router DSN logic) ──────────
|
|
_pgh = os.environ.get('PG_HOST', '10.10.0.2')
|
|
_pgp = int(os.environ.get('PG_PORT', '6432'))
|
|
if _pgh in ('localhost', '127.0.0.1'):
|
|
_pgh = os.environ.get('DB_HOST', '10.10.0.2')
|
|
_pgp = int(os.environ.get('DB_PORT', '6432'))
|
|
_DB = dict(host=_pgh, port=_pgp,
|
|
dbname=os.environ.get('PG_DB', 'rinet_v3'),
|
|
user=os.environ.get('PG_USER', 'rinet'),
|
|
password=os.environ.get('PG_PASS', 'R1net2026!SecureDB#v7'))
|
|
|
|
def _conn():
|
|
c = psycopg2.connect(**_DB); c.autocommit = True; return c
|
|
|
|
|
|
def audit_log(action: str,
|
|
target_type: Optional[str] = None,
|
|
target_id: Optional[int] = None,
|
|
target_text: Optional[str] = None,
|
|
payload: Optional[dict] = None,
|
|
user_id: Optional[int] = None,
|
|
user_email: Optional[str] = None,
|
|
ip_address: Optional[str] = None,
|
|
user_agent: Optional[str] = None) -> Optional[int]:
|
|
"""Insert one row into pgz_sport.sys_audit. Fail-soft.
|
|
|
|
Other routers should call this after any successful DB mutation.
|
|
The DB trigger trg_audit_chain populates row_hash + chain_idx automatically.
|
|
|
|
Returns the new row id, or None on failure.
|
|
"""
|
|
if not action:
|
|
return None
|
|
try:
|
|
with _conn() as c, c.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO pgz_sport.sys_audit
|
|
(user_id, user_email, action, target_type, target_id, target_text,
|
|
payload, ip_address, user_agent)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s::jsonb, %s, %s)
|
|
RETURNING id
|
|
""", (
|
|
user_id,
|
|
user_email,
|
|
action[:100],
|
|
(target_type or None) and str(target_type)[:50],
|
|
target_id,
|
|
target_text,
|
|
json.dumps(payload, default=str, ensure_ascii=False) if payload is not None else None,
|
|
ip_address,
|
|
user_agent,
|
|
))
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
except Exception:
|
|
# Never block business logic on audit failure — log and move on.
|
|
sys.stderr.write('[audit_log] insert failed for action='+str(action)+'\n')
|
|
sys.stderr.write(traceback.format_exc())
|
|
return None
|
|
|
|
|
|
def _row_to_item(r: dict) -> dict:
|
|
"""Normalise a sys_audit row for the audit.html UI."""
|
|
payload = r.get('payload') or {}
|
|
if isinstance(payload, str):
|
|
try: payload = json.loads(payload)
|
|
except: pass
|
|
out = {
|
|
'id': r.get('id'),
|
|
'user_id': r.get('user_id'),
|
|
'user_email': r.get('user_email'),
|
|
'action': r.get('action'),
|
|
'resource_type': r.get('target_type'),
|
|
'resource_id': r.get('target_id'),
|
|
'target_text': r.get('target_text'),
|
|
'details': payload,
|
|
'row_hash': r.get('row_hash'),
|
|
'chain_idx': r.get('chain_idx'),
|
|
'created_at': (r.get('created_at').isoformat() if isinstance(r.get('created_at'), datetime) else r.get('created_at')),
|
|
}
|
|
# Surface a polygon tx hash if it was stored in payload (seal_to_polygon does this)
|
|
if isinstance(payload, dict):
|
|
out['tx_hash'] = payload.get('tx_hash') or payload.get('polygon_tx') or payload.get('seal_tx_hash')
|
|
return out
|
|
|
|
|
|
# ── R4: audit log + stats endpoints ─────────────────────────────────────
|
|
@router.get("/audit/log")
|
|
def audit_log_list(limit: int = Query(200, ge=1, le=1000),
|
|
action: Optional[str] = None,
|
|
resource: Optional[str] = None,
|
|
user: Optional[str] = None,
|
|
q: Optional[str] = None,
|
|
since: Optional[str] = None):
|
|
"""Recent audit rows with simple filters. resource matches target_type."""
|
|
where = []
|
|
params: list = []
|
|
if action:
|
|
where.append("action ILIKE %s"); params.append('%'+action+'%')
|
|
if resource:
|
|
where.append("target_type ILIKE %s"); params.append('%'+resource+'%')
|
|
if user:
|
|
where.append("(user_email ILIKE %s OR CAST(user_id AS text)=%s)")
|
|
params.append('%'+user+'%'); params.append(user)
|
|
if q:
|
|
where.append("(action ILIKE %s OR target_type ILIKE %s OR target_text ILIKE %s OR user_email ILIKE %s OR payload::text ILIKE %s)")
|
|
params.extend(['%'+q+'%']*5)
|
|
if since:
|
|
where.append("created_at >= %s::timestamptz"); params.append(since)
|
|
sql = ("""
|
|
SELECT id, user_id, user_email, action, target_type, target_id, target_text,
|
|
payload, ip_address, user_agent, row_hash, chain_idx, created_at
|
|
FROM pgz_sport.sys_audit
|
|
""" + (" WHERE "+ ' AND '.join(where) if where else '') + """
|
|
ORDER BY id DESC LIMIT %s
|
|
""")
|
|
params.append(limit)
|
|
with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(sql, params)
|
|
rows = [_row_to_item(dict(r)) for r in cur.fetchall()]
|
|
cur.execute("SELECT count(*) AS n FROM pgz_sport.sys_audit")
|
|
total = cur.fetchone()['n']
|
|
return {'count': len(rows), 'total': total, 'items': rows}
|
|
|
|
|
|
@router.get("/audit/stats")
|
|
def audit_stats():
|
|
with _conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
(SELECT count(*) FROM pgz_sport.sys_audit) AS total,
|
|
(SELECT count(*) FROM pgz_sport.sys_audit WHERE created_at::date = current_date) AS today,
|
|
(SELECT count(DISTINCT user_email) FROM pgz_sport.sys_audit
|
|
WHERE user_email IS NOT NULL AND created_at >= now()-interval '30 days') AS users,
|
|
(SELECT count(*) FROM pgz_sport.sys_audit
|
|
WHERE payload ? 'tx_hash' OR payload ? 'polygon_tx' OR payload ? 'seal_tx_hash')
|
|
AS sealed
|
|
""")
|
|
r = cur.fetchone() or {}
|
|
return {
|
|
'total': int(r.get('total') or 0),
|
|
'today': int(r.get('today') or 0),
|
|
'sealed': int(r.get('sealed') or 0),
|
|
'users': int(r.get('users') or 0),
|
|
}
|
|
|
|
|
|
@router.post("/audit/seal")
|
|
def audit_seal(body: dict = Body(...),
|
|
x_user_email: Optional[str] = Header(default=None),
|
|
x_user_id: Optional[int] = Header(default=None)):
|
|
"""Seal an audit event to Polygon PoS (or queue for later if no key)."""
|
|
if not isinstance(body, dict):
|
|
raise HTTPException(400, "JSON body required")
|
|
action = (body.get('action') or '').strip()
|
|
if not action:
|
|
raise HTTPException(400, "action is required")
|
|
|
|
payload = body.get('payload')
|
|
data_hash = (body.get('data_hash') or '').strip().lower().lstrip('0x')
|
|
if not data_hash:
|
|
if payload is None:
|
|
raise HTTPException(400, "either data_hash or payload required")
|
|
data_hash = seal_mod.hash_payload(payload)
|
|
|
|
ref_id = body.get('ref_id')
|
|
if ref_id is None:
|
|
raise HTTPException(400, "ref_id is required")
|
|
ref_type = body.get('ref_type')
|
|
|
|
try:
|
|
result = seal_mod.seal_to_polygon(
|
|
data_hash=data_hash,
|
|
ref_id=str(ref_id),
|
|
action=action,
|
|
ref_type=ref_type,
|
|
payload=payload,
|
|
user_id=x_user_id,
|
|
user_email=x_user_email,
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(400, str(e))
|
|
return result
|
|
|
|
|
|
@router.get("/audit/seal/list")
|
|
def audit_seal_list(action: Optional[str] = None,
|
|
ref_type: Optional[str] = None,
|
|
ref_id: Optional[str] = None,
|
|
limit: int = 50):
|
|
rows = seal_mod.list_seals(action=action, ref_type=ref_type,
|
|
ref_id=ref_id, limit=limit)
|
|
return {'count': len(rows), 'rows': rows,
|
|
'wallet': seal_mod.POLYGON_WALLET,
|
|
'chain_id': seal_mod.POLYGON_CHAIN_ID,
|
|
'live': seal_mod.HAS_WEB3 and bool(seal_mod.POLYGON_PRIVKEY)}
|
|
|
|
|
|
@router.get("/audit/seal/{seal_id}")
|
|
def audit_seal_get(seal_id: str):
|
|
row = seal_mod.verify_seal(seal_id)
|
|
if not row:
|
|
raise HTTPException(404, "seal not found")
|
|
return row
|