#!/usr/bin/env python3 """ enrichment_worker.py — 24/7 background enrichment for PGŽ Sport Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com Date: 2026-05-04 Version: 1.0.0 Polls pgz_sport.clanovi / klubovi / savezi for under-enriched rows, then calls the live HTTP endpoints (POST /sport/api/v2/enrich/{kind}/{id} and .../apply) so every row goes through the same pipeline (and audit log) that the UI uses. This avoids forking the enrichment logic. Selection rules (per cycle): - sportas: clanovi rows missing profile_url AND (source IN ('hns_semafor','manual') OR vanjski_id ? 'hns_comet') ORDER BY random() LIMIT 25 - klub: klubovi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 10 - savez: savezi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 5 Sleep 300 s between cycles (configurable via ENRICHER_SLEEP env). Heartbeat to redis (cc:pgz-enricher:heartbeat) and log every cycle to /opt/pgz-sport/_logs/enrichment_worker.log. """ from __future__ import annotations import json import os import random import sys import time import urllib.parse import urllib.request from datetime import datetime, timezone import psycopg2 import psycopg2.extras API_BASE = os.environ.get('PGZ_API_BASE', 'http://localhost:8095') SLEEP_S = int(os.environ.get('ENRICHER_SLEEP', '300')) DRY = os.environ.get('ENRICHER_DRY', '0') == '1' USER_HDR = os.environ.get('ENRICHER_USER', 'enricher@pgz.local') LOG_PATHS = [ '/var/log/pgz-sport-enricher.log', '/opt/pgz-sport/_logs/enrichment_worker.log', ] CONFIDENCE_MIN = float(os.environ.get('ENRICHER_CONFIDENCE', '0.7')) COVERAGE_MAX = int(os.environ.get('ENRICHER_COVERAGE_MAX', '70')) _pgh = os.environ.get('PG_HOST', '10.10.0.2') _pgp = int(os.environ.get('PG_PORT', '6432')) if _pgh in ('localhost', '127.0.0.1'): _pgh = os.environ.get('DB_HOST', '10.10.0.2') _pgp = int(os.environ.get('DB_PORT', '6432')) DB = dict(host=_pgh, port=_pgp, dbname=os.environ.get('PG_DB', 'rinet_v3'), user=os.environ.get('PG_USER', 'rinet'), password=os.environ.get('PG_PASS', 'R1net2026!SecureDB#v7')) def _log(msg: str) -> None: line = f"{datetime.now(timezone.utc).isoformat()}Z {msg}" print(line, flush=True) for p in LOG_PATHS: try: os.makedirs(os.path.dirname(p), exist_ok=True) with open(p, 'a') as f: f.write(line + "\n") except Exception: pass def _heartbeat() -> None: try: import redis r = redis.Redis(host=os.environ.get('REDIS_HOST', 'localhost'), port=int(os.environ.get('REDIS_PORT', '6379')), password=os.environ.get('REDIS_PASS', None)) r.set('cc:pgz-enricher:heartbeat', str(int(time.time()))) except Exception: pass def _db(): c = psycopg2.connect(**DB); c.autocommit = True; return c # Coverage = (filled key fields) / (total key fields) * 100. Keep these in sync # with enrich_router.enrich_preview() which surfaces the same scores in the UI. _KLUB_KEYS = ('oib','sport','grad','predsjednik','tajnik','web','email','telefon', 'sjediste','godina_osnutka','ciljevi','opis_djelatnosti') _SAVEZ_KEYS = ('oib','sport','predsjednik','tajnik','email','telefon','web', 'adresa','godina_osnutka') # Coverage for sportas — fields the user actually wants populated. _SPORTAS_KEYS = ('sport','profile_url','slika_url','hns_igrac_id','biografija', 'datum_rodenja','mjesto_rodenja','broj_dresa') def _coverage_expr(table_keys: tuple[str, ...]) -> str: """Postgres expression that returns 0..100 coverage % for the row.""" parts = [] for k in table_keys: parts.append(f"(CASE WHEN {k} IS NOT NULL AND ({k}::text) <> '' THEN 1 ELSE 0 END)") total = len(table_keys) return f"((({' + '.join(parts)})::numeric * 100) / {total})" def _pick_sportas(limit: int = 50) -> list[int]: """Athletes with coverage>'enriched_at') IS NULL OR (metadata->>'enriched_at')::timestamptz < now() - interval '7 days') ORDER BY random() LIMIT %s """ with _db() as c, c.cursor() as cur: cur.execute(sql, (COVERAGE_MAX, limit)) return [r[0] for r in cur.fetchall()] def _pick_klub(limit: int = 50) -> list[int]: cov = _coverage_expr(_KLUB_KEYS) sql = f""" SELECT id FROM pgz_sport.klubovi WHERE aktivan = TRUE AND {cov} < %s AND ((metadata->>'enriched_at') IS NULL OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days') ORDER BY random() LIMIT %s """ with _db() as c, c.cursor() as cur: cur.execute(sql, (COVERAGE_MAX, limit)) return [r[0] for r in cur.fetchall()] def _pick_savez(limit: int = 50) -> list[int]: cov = _coverage_expr(_SAVEZ_KEYS) sql = f""" SELECT id FROM pgz_sport.savezi WHERE {cov} < %s AND ((metadata->>'enriched_at') IS NULL OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days') ORDER BY random() LIMIT %s """ with _db() as c, c.cursor() as cur: cur.execute(sql, (COVERAGE_MAX, limit)) return [r[0] for r in cur.fetchall()] def _http_post(path: str, body: dict | None = None) -> dict | None: url = API_BASE.rstrip('/') + path data = json.dumps(body or {}).encode('utf-8') req = urllib.request.Request( url, data=data, method='POST', headers={'Content-Type': 'application/json', 'X-User-Email': USER_HDR}) try: with urllib.request.urlopen(req, timeout=60) as r: return json.loads(r.read().decode('utf-8')) except Exception as e: _log(f"POST {path} failed: {type(e).__name__}: {e}") return None # Per-source confidence weights. Anything written by an HNS Semafor /igraci/ # page is structured + verified, so we trust it implicitly. Wikipedia summaries # are mostly safe but free-form. sport-pgz.hr "O nama" pages tend to be the # zajednica generic info, so we down-weight them so a plain DeepSeek synthesis # off a single sport-pgz.hr source falls below the gate. _SOURCE_WEIGHTS = { 'semafor.hns.family': 0.95, 'wikipedia.hr': 0.80, 'sport-pgz.hr': 0.55, } # Fields that are safe to auto-write even from low-confidence sources because # they come from the entity's own structured page (URLs, IDs). _HARD_FIELDS = {'profile_url','source_url','slika_url','hns_igrac_id'} def _confidence(proposed: dict, sources: list[dict]) -> float: """Crude 0..1 score: max source weight, scaled by evidence count.""" if not proposed: return 0.0 weights = [] for s in sources or []: w = _SOURCE_WEIGHTS.get((s.get('source') or '').lower(), 0.50) weights.append(w) if not weights: return 0.0 base = max(weights) bonus = min(0.10, 0.03 * (len(sources) - 1)) return min(1.0, base + bonus) def _process(kind: str, eid: int) -> tuple[int, list[str]]: """Preview → confidence gate → apply. Returns (#applied, fields).""" preview = _http_post(f'/api/v2/enrich/{kind}/{eid}', {}) if not preview: return (0, []) proposed = preview.get('proposed') or {} sources = preview.get('sources') or [] if not proposed: return (0, []) conf = _confidence(proposed, sources) # Always allow hard structured fields (URLs / IDs) — they are objective. hard = {k: v for k, v in proposed.items() if k in _HARD_FIELDS} soft = {k: v for k, v in proposed.items() if k not in _HARD_FIELDS} fields = dict(hard) if conf >= CONFIDENCE_MIN: fields.update(soft) if not fields: _log(f" {kind}#{eid} skipped — confidence {conf:.2f} < {CONFIDENCE_MIN:.2f}") return (0, []) res = _http_post(f'/api/v2/enrich/{kind}/{eid}/apply', {'fields': fields, 'sources': sources}) if not res or 'applied' not in res: return (0, []) applied = res['applied'] if applied: _log(f" {kind}#{eid} conf={conf:.2f} → +{len(applied)} {','.join(applied.keys())}") return (len(applied), list(applied.keys())) def _cycle() -> dict: started = time.time() out = {'sportas': 0, 'klub': 0, 'savez': 0, 'fields_total': 0} fields_total = 0 for kind, picker, limit in ( ('sportas', _pick_sportas, 25), ('klub', _pick_klub, 10), ('savez', _pick_savez, 5), ): ids = picker(limit) random.shuffle(ids) _log(f"cycle: {kind} candidates={len(ids)}") for eid in ids: if DRY: continue n, fields = _process(kind, eid) out[kind] += 1 fields_total += n if n: _log(f" {kind}#{eid} → +{n} fields {','.join(fields)}") time.sleep(1.5) # gentle pacing _heartbeat() out['fields_total'] = fields_total out['elapsed_s'] = round(time.time() - started, 1) return out def main() -> int: _log(f"enrichment_worker starting | API_BASE={API_BASE} | sleep={SLEEP_S}s | dry={DRY}") while True: try: stats = _cycle() _log(f"cycle done: {json.dumps(stats)}") except Exception as e: _log(f"cycle FAILED: {type(e).__name__}: {e}") _heartbeat() time.sleep(SLEEP_S) if __name__ == '__main__': sys.exit(main() or 0)