f5c6570d47
The bare @app.get/post('/api/admin/users') decorators in pgz_sport_api.py
were registered before app.include_router(admin_users_router) and shadowed
the JWT-protected M2 routes, leaking user list to anyone.
Removed all three: GET /api/admin/users, POST /api/admin/users,
POST /api/admin/users/{uid}/toggle. The auth.admin_users router now owns
this prefix exclusively and gates every method with require_user.
Verified: no-auth → 401, invalid token → 401, valid Bearer → 200.
276 lines
9.8 KiB
Python
276 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
enrichment_worker.py — 24/7 background enrichment for PGŽ Sport
|
|
Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com
|
|
Date: 2026-05-04
|
|
Version: 1.0.0
|
|
|
|
Polls pgz_sport.clanovi / klubovi / savezi for under-enriched rows, then
|
|
calls the live HTTP endpoints (POST /sport/api/v2/enrich/{kind}/{id} and
|
|
.../apply) so every row goes through the same pipeline (and audit log)
|
|
that the UI uses. This avoids forking the enrichment logic.
|
|
|
|
Selection rules (per cycle):
|
|
- sportas: clanovi rows missing profile_url AND (source IN ('hns_semafor','manual')
|
|
OR vanjski_id ? 'hns_comet') ORDER BY random() LIMIT 25
|
|
- klub: klubovi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 10
|
|
- savez: savezi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 5
|
|
|
|
Sleep 300 s between cycles (configurable via ENRICHER_SLEEP env).
|
|
|
|
Heartbeat to redis (cc:pgz-enricher:heartbeat) and log every cycle to
|
|
/opt/pgz-sport/_logs/enrichment_worker.log.
|
|
"""
|
|
from __future__ import annotations
|
|
import json
|
|
import os
|
|
import random
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
API_BASE = os.environ.get('PGZ_API_BASE', 'http://localhost:8095')
|
|
SLEEP_S = int(os.environ.get('ENRICHER_SLEEP', '300'))
|
|
DRY = os.environ.get('ENRICHER_DRY', '0') == '1'
|
|
USER_HDR = os.environ.get('ENRICHER_USER', 'enricher@pgz.local')
|
|
|
|
LOG_PATHS = [
|
|
'/var/log/pgz-sport-enricher.log',
|
|
'/opt/pgz-sport/_logs/enrichment_worker.log',
|
|
]
|
|
CONFIDENCE_MIN = float(os.environ.get('ENRICHER_CONFIDENCE', '0.7'))
|
|
COVERAGE_MAX = int(os.environ.get('ENRICHER_COVERAGE_MAX', '70'))
|
|
|
|
_pgh = os.environ.get('PG_HOST', '10.10.0.2')
|
|
_pgp = int(os.environ.get('PG_PORT', '6432'))
|
|
if _pgh in ('localhost', '127.0.0.1'):
|
|
_pgh = os.environ.get('DB_HOST', '10.10.0.2')
|
|
_pgp = int(os.environ.get('DB_PORT', '6432'))
|
|
DB = dict(host=_pgh, port=_pgp,
|
|
dbname=os.environ.get('PG_DB', 'rinet_v3'),
|
|
user=os.environ.get('PG_USER', 'rinet'),
|
|
password=os.environ.get('PG_PASS', 'R1net2026!SecureDB#v7'))
|
|
|
|
|
|
def _log(msg: str) -> None:
|
|
line = f"{datetime.now(timezone.utc).isoformat()}Z {msg}"
|
|
print(line, flush=True)
|
|
for p in LOG_PATHS:
|
|
try:
|
|
os.makedirs(os.path.dirname(p), exist_ok=True)
|
|
with open(p, 'a') as f:
|
|
f.write(line + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _heartbeat() -> None:
|
|
try:
|
|
import redis
|
|
r = redis.Redis(host=os.environ.get('REDIS_HOST', 'localhost'),
|
|
port=int(os.environ.get('REDIS_PORT', '6379')),
|
|
password=os.environ.get('REDIS_PASS', None))
|
|
r.set('cc:pgz-enricher:heartbeat', str(int(time.time())))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _db():
|
|
c = psycopg2.connect(**DB); c.autocommit = True; return c
|
|
|
|
|
|
# Coverage = (filled key fields) / (total key fields) * 100. Keep these in sync
|
|
# with enrich_router.enrich_preview() which surfaces the same scores in the UI.
|
|
_KLUB_KEYS = ('oib','sport','grad','predsjednik','tajnik','web','email','telefon',
|
|
'sjediste','godina_osnutka','ciljevi','opis_djelatnosti')
|
|
_SAVEZ_KEYS = ('oib','sport','predsjednik','tajnik','email','telefon','web',
|
|
'adresa','godina_osnutka')
|
|
# Coverage for sportas — fields the user actually wants populated.
|
|
_SPORTAS_KEYS = ('sport','profile_url','slika_url','hns_igrac_id','biografija',
|
|
'datum_rodenja','mjesto_rodenja','broj_dresa')
|
|
|
|
|
|
def _coverage_expr(table_keys: tuple[str, ...]) -> str:
|
|
"""Postgres expression that returns 0..100 coverage % for the row."""
|
|
parts = []
|
|
for k in table_keys:
|
|
parts.append(f"(CASE WHEN {k} IS NOT NULL AND ({k}::text) <> '' THEN 1 ELSE 0 END)")
|
|
total = len(table_keys)
|
|
return f"((({' + '.join(parts)})::numeric * 100) / {total})"
|
|
|
|
|
|
def _pick_sportas(limit: int = 50) -> list[int]:
|
|
"""Athletes with coverage<COVERAGE_MAX, randomly ordered."""
|
|
cov = _coverage_expr(_SPORTAS_KEYS)
|
|
sql = f"""
|
|
SELECT id FROM pgz_sport.clanovi
|
|
WHERE aktivan = TRUE
|
|
AND {cov} < %s
|
|
AND (
|
|
source IN ('hns_semafor','hns_family','manual','godisnjak')
|
|
OR jsonb_exists(vanjski_id, 'hns_comet')
|
|
OR (source_url ILIKE '%%semafor.hns.family%%')
|
|
OR profile_url ILIKE '%%semafor.hns.family%%'
|
|
)
|
|
AND ((metadata->>'enriched_at') IS NULL
|
|
OR (metadata->>'enriched_at')::timestamptz < now() - interval '7 days')
|
|
ORDER BY random()
|
|
LIMIT %s
|
|
"""
|
|
with _db() as c, c.cursor() as cur:
|
|
cur.execute(sql, (COVERAGE_MAX, limit))
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
def _pick_klub(limit: int = 50) -> list[int]:
|
|
cov = _coverage_expr(_KLUB_KEYS)
|
|
sql = f"""
|
|
SELECT id FROM pgz_sport.klubovi
|
|
WHERE aktivan = TRUE
|
|
AND {cov} < %s
|
|
AND ((metadata->>'enriched_at') IS NULL
|
|
OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days')
|
|
ORDER BY random()
|
|
LIMIT %s
|
|
"""
|
|
with _db() as c, c.cursor() as cur:
|
|
cur.execute(sql, (COVERAGE_MAX, limit))
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
def _pick_savez(limit: int = 50) -> list[int]:
|
|
cov = _coverage_expr(_SAVEZ_KEYS)
|
|
sql = f"""
|
|
SELECT id FROM pgz_sport.savezi
|
|
WHERE {cov} < %s
|
|
AND ((metadata->>'enriched_at') IS NULL
|
|
OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days')
|
|
ORDER BY random()
|
|
LIMIT %s
|
|
"""
|
|
with _db() as c, c.cursor() as cur:
|
|
cur.execute(sql, (COVERAGE_MAX, limit))
|
|
return [r[0] for r in cur.fetchall()]
|
|
|
|
|
|
def _http_post(path: str, body: dict | None = None) -> dict | None:
|
|
url = API_BASE.rstrip('/') + path
|
|
data = json.dumps(body or {}).encode('utf-8')
|
|
req = urllib.request.Request(
|
|
url, data=data, method='POST',
|
|
headers={'Content-Type': 'application/json',
|
|
'X-User-Email': USER_HDR})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=60) as r:
|
|
return json.loads(r.read().decode('utf-8'))
|
|
except Exception as e:
|
|
_log(f"POST {path} failed: {type(e).__name__}: {e}")
|
|
return None
|
|
|
|
|
|
# Per-source confidence weights. Anything written by an HNS Semafor /igraci/
|
|
# page is structured + verified, so we trust it implicitly. Wikipedia summaries
|
|
# are mostly safe but free-form. sport-pgz.hr "O nama" pages tend to be the
|
|
# zajednica generic info, so we down-weight them so a plain DeepSeek synthesis
|
|
# off a single sport-pgz.hr source falls below the gate.
|
|
_SOURCE_WEIGHTS = {
|
|
'semafor.hns.family': 0.95,
|
|
'wikipedia.hr': 0.80,
|
|
'sport-pgz.hr': 0.55,
|
|
}
|
|
# Fields that are safe to auto-write even from low-confidence sources because
|
|
# they come from the entity's own structured page (URLs, IDs).
|
|
_HARD_FIELDS = {'profile_url','source_url','slika_url','hns_igrac_id'}
|
|
|
|
|
|
def _confidence(proposed: dict, sources: list[dict]) -> float:
|
|
"""Crude 0..1 score: max source weight, scaled by evidence count."""
|
|
if not proposed:
|
|
return 0.0
|
|
weights = []
|
|
for s in sources or []:
|
|
w = _SOURCE_WEIGHTS.get((s.get('source') or '').lower(), 0.50)
|
|
weights.append(w)
|
|
if not weights:
|
|
return 0.0
|
|
base = max(weights)
|
|
bonus = min(0.10, 0.03 * (len(sources) - 1))
|
|
return min(1.0, base + bonus)
|
|
|
|
|
|
def _process(kind: str, eid: int) -> tuple[int, list[str]]:
|
|
"""Preview → confidence gate → apply. Returns (#applied, fields)."""
|
|
preview = _http_post(f'/api/v2/enrich/{kind}/{eid}', {})
|
|
if not preview:
|
|
return (0, [])
|
|
proposed = preview.get('proposed') or {}
|
|
sources = preview.get('sources') or []
|
|
if not proposed:
|
|
return (0, [])
|
|
conf = _confidence(proposed, sources)
|
|
# Always allow hard structured fields (URLs / IDs) — they are objective.
|
|
hard = {k: v for k, v in proposed.items() if k in _HARD_FIELDS}
|
|
soft = {k: v for k, v in proposed.items() if k not in _HARD_FIELDS}
|
|
fields = dict(hard)
|
|
if conf >= CONFIDENCE_MIN:
|
|
fields.update(soft)
|
|
if not fields:
|
|
_log(f" {kind}#{eid} skipped — confidence {conf:.2f} < {CONFIDENCE_MIN:.2f}")
|
|
return (0, [])
|
|
res = _http_post(f'/api/v2/enrich/{kind}/{eid}/apply',
|
|
{'fields': fields, 'sources': sources})
|
|
if not res or 'applied' not in res:
|
|
return (0, [])
|
|
applied = res['applied']
|
|
if applied:
|
|
_log(f" {kind}#{eid} conf={conf:.2f} → +{len(applied)} {','.join(applied.keys())}")
|
|
return (len(applied), list(applied.keys()))
|
|
|
|
|
|
def _cycle() -> dict:
|
|
started = time.time()
|
|
out = {'sportas': 0, 'klub': 0, 'savez': 0, 'fields_total': 0}
|
|
fields_total = 0
|
|
for kind, picker, limit in (
|
|
('sportas', _pick_sportas, 25),
|
|
('klub', _pick_klub, 10),
|
|
('savez', _pick_savez, 5),
|
|
):
|
|
ids = picker(limit)
|
|
random.shuffle(ids)
|
|
_log(f"cycle: {kind} candidates={len(ids)}")
|
|
for eid in ids:
|
|
if DRY:
|
|
continue
|
|
n, fields = _process(kind, eid)
|
|
out[kind] += 1
|
|
fields_total += n
|
|
if n:
|
|
_log(f" {kind}#{eid} → +{n} fields {','.join(fields)}")
|
|
time.sleep(1.5) # gentle pacing
|
|
_heartbeat()
|
|
out['fields_total'] = fields_total
|
|
out['elapsed_s'] = round(time.time() - started, 1)
|
|
return out
|
|
|
|
|
|
def main() -> int:
|
|
_log(f"enrichment_worker starting | API_BASE={API_BASE} | sleep={SLEEP_S}s | dry={DRY}")
|
|
while True:
|
|
try:
|
|
stats = _cycle()
|
|
_log(f"cycle done: {json.dumps(stats)}")
|
|
except Exception as e:
|
|
_log(f"cycle FAILED: {type(e).__name__}: {e}")
|
|
_heartbeat()
|
|
time.sleep(SLEEP_S)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main() or 0)
|