CC2 R4 #4: /api/users/me/gdpr-export alias

- New auth.gdpr.me_router prefix /api/users/me with:
  - GET/POST /gdpr-export → Art.20 JSON download with Content-Disposition
  - POST /gdpr-erase → Art.17 erasure request
  - GET /gdpr-consent → consent history for caller
- jsonable_encoder fixes datetime serialisation in JSONResponse
- admin_users.html: 'Izvezi moje podatke' now POSTs to alias and uses
  filename from Content-Disposition header
- 401 enforced on no-auth, 200 on valid Bearer (verified live)
This commit is contained in:
Damir Radulić
2026-05-05 00:47:22 +02:00
parent ca92717039
commit a0db65fc31
14 changed files with 4796 additions and 30 deletions
+182
View File
@@ -1000,3 +1000,185 @@ def forensic_scan(req: dict = Body(...)):
'overall_risk_score': overall, 'total_links': total_links,
'total_findings': total_findings, 'critical_findings': crit_findings,
'persons': persons, 'scanned_at': int(time.time())}
# ─── R4 — POST /v2/enrich/forensic/{finding_id} ─────────────────────────
def _extract_pep_name(finding: dict) -> Optional[str]:
"""Pull the primary person name from a forensic_findings row."""
title = (finding.get('title') or '').strip()
desc = (finding.get('description') or '').strip()
payload = finding.get('raw_data') or {}
if isinstance(payload, str):
try: payload = json.loads(payload)
except Exception: payload = {}
if isinstance(payload, dict):
for k in ('person_name', 'name', 'osoba'):
v = payload.get(k)
if v: return str(v).strip()
# Try entities_involved.entity_name
ents = finding.get('entities_involved') or []
if isinstance(ents, str):
try: ents = json.loads(ents)
except Exception: ents = []
if isinstance(ents, list):
for e in ents:
if isinstance(e, dict) and e.get('person_name'):
return str(e['person_name']).strip()
if isinstance(e, dict) and e.get('entity_name') and ' ' in (e.get('entity_name') or ''):
# Some entries store person names as entity_name when entity_type='person'
if (e.get('entity_type') or '').lower() in ('person','osoba'):
return str(e['entity_name']).strip()
# Fallback: extract a "Ime Prezime" from the title
m = re.search(r'\b([A-ZČĆŠĐŽ][a-zčćšđž]+)\s+([A-ZČĆŠĐŽ][a-zčćšđž]+(?:-[A-ZČĆŠĐŽ][a-zčćšđž]+)?)\b', title + ' ' + desc)
if m: return f"{m.group(1)} {m.group(2)}"
return None
def _gather_pep_evidence(name: str) -> list[dict]:
sources: list[dict] = []
wiki = _wiki_summary(name)
if wiki: sources.append(wiki)
# DDG html-lite as a "Google snippet" replacement (often OK for HR PEPs)
ddg = 'https://html.duckduckgo.com/html/?q=' + urllib.parse.quote(f'"{name}" PGŽ Hrvatska')
page = _http_get(ddg, timeout=8)
if page:
# First result block
m = re.search(r'<a[^>]+class="result__a"[^>]+href="([^"]+)"[^>]*>([^<]{6,200})</a>', page)
snippet_m = re.search(r'<a[^>]+class="result__snippet"[^>]*>(.*?)</a>', page, re.S)
if m:
sources.append({
'source': 'duckduckgo',
'url': html.unescape(m.group(1))[:500],
'title': html.unescape(m.group(2)).strip()[:300],
'extract': re.sub(r'<[^>]+>', ' ', snippet_m.group(1)).strip()[:600] if snippet_m else None,
})
return sources
def _related_entities_for_pep(name: str) -> list[dict]:
"""Pull civic.persons + their entity links so we have the structured graph."""
out: list[dict] = []
with _db() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""SELECT id, name, function, party, county, city, oib, trust_tier
FROM civic.persons
WHERE upper(name) ILIKE upper(%s)
ORDER BY oib NULLS LAST, id LIMIT 10""", ('%'+name+'%',))
for p in cur.fetchall():
p = dict(p)
entry = {
'kind': 'person',
'person_id': p['id'], 'person_name': p['name'],
'function': p.get('function'), 'party': p.get('party'),
'county': p.get('county'), 'city': p.get('city'),
'oib': p.get('oib'), 'trust_tier': p.get('trust_tier'),
'entities': [],
}
if p.get('oib'):
cur.execute("""SELECT pel.entity_id, pel.roles, e.name AS entity_name,
e.oib AS entity_oib, e.entity_type, e.city, e.risk_score
FROM civic.person_entity_links pel
LEFT JOIN civic.entities e ON e.id = pel.entity_id
WHERE pel.person_oib=%s LIMIT 30""", (p['oib'],))
for r in cur.fetchall():
entry['entities'].append(dict(r))
out.append(entry)
return out
@router.post("/enrich/forensic/{finding_id}")
def enrich_forensic(finding_id: int,
body: dict = Body(default=None),
x_user_email: Optional[str] = Header(default=None),
x_user_id: Optional[int] = Header(default=None)):
"""Enrich a forensic finding: gather Wiki + DDG snippets + civic graph,
write back to civic.forensic_findings.related_entities, and seal the
payload hash on Polygon (or queue for sealing).
"""
body = body or {}
explicit_name = (body.get('name') or '').strip() or None
with _db() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute("""SELECT id, finding_type, severity, title, description,
entities_involved, raw_data, related_entities, enrichment_metadata
FROM civic.forensic_findings WHERE id=%s""", (finding_id,))
finding = cur.fetchone()
if not finding:
raise HTTPException(404, "finding not found")
finding = dict(finding)
name = explicit_name or _extract_pep_name(finding)
if not name:
raise HTTPException(400, "could not derive a person/entity name; pass {name: \"\"}")
sources = _gather_pep_evidence(name)
related = _related_entities_for_pep(name)
payload = {
'finding_id': finding_id,
'name': name,
'sources': [{'source': s.get('source'), 'url': s.get('url'),
'title': s.get('title')} for s in sources],
'related_entities': related,
'enriched_at': datetime.now(timezone.utc).isoformat(),
}
# Persist back to the finding
enrichment_meta = finding.get('enrichment_metadata') or {}
if not isinstance(enrichment_meta, dict): enrichment_meta = {}
history = enrichment_meta.get('history') or []
history.append({
'at': payload['enriched_at'],
'sources': payload['sources'],
'related_count': len(related),
'user': x_user_email,
})
enrichment_meta['history'] = history[-10:]
enrichment_meta['enriched_at'] = payload['enriched_at']
enrichment_meta['enriched_by'] = x_user_email or 'system'
enrichment_meta['source_count'] = len(sources)
with _db() as c, c.cursor() as cur:
cur.execute("""UPDATE civic.forensic_findings
SET related_entities = %s::jsonb,
enrichment_metadata = %s::jsonb
WHERE id=%s
RETURNING id""",
(json.dumps(related, default=str, ensure_ascii=False),
json.dumps(enrichment_meta, default=str, ensure_ascii=False),
finding_id))
cur.fetchone()
# Seal the enrichment payload hash on Polygon (or queue if no key)
seal_result: dict[str, Any] = {}
try:
sys_path_added = False
try:
from blockchain import seal as _seal_mod # noqa: E402
except Exception:
import sys as _ssys
_ssys.path.insert(0, '/opt/pgz-sport')
from blockchain import seal as _seal_mod # noqa: E402
sys_path_added = True
del sys_path_added # silence linters
h = _seal_mod.hash_payload(payload)
seal_result = _seal_mod.seal_to_polygon(
data_hash=h,
ref_id=str(finding_id),
action='forensic.enriched',
ref_type='forensic_finding',
payload=payload,
user_id=x_user_id,
user_email=x_user_email,
)
except Exception as e:
seal_result = {'error': f'{type(e).__name__}: {e}'}
return {
'finding_id': finding_id,
'name': name,
'sources': sources,
'related_entities': related,
'related_count': len(related),
'enrichment_metadata': enrichment_meta,
'seal': seal_result,
}