#!/usr/bin/env python3 """ enrichment_worker.py — 24/7 background enrichment for PGŽ Sport Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com Date: 2026-05-04 Version: 1.0.0 Polls pgz_sport.clanovi / klubovi / savezi for under-enriched rows, then calls the live HTTP endpoints (POST /sport/api/v2/enrich/{kind}/{id} and .../apply) so every row goes through the same pipeline (and audit log) that the UI uses. This avoids forking the enrichment logic. Selection rules (per cycle): - sportas: clanovi rows missing profile_url AND (source IN ('hns_semafor','manual') OR vanjski_id ? 'hns_comet') ORDER BY random() LIMIT 25 - klub: klubovi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 10 - savez: savezi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 5 Sleep 300 s between cycles (configurable via ENRICHER_SLEEP env). Heartbeat to redis (cc:pgz-enricher:heartbeat) and log every cycle to /opt/pgz-sport/_logs/enrichment_worker.log. """ from __future__ import annotations import json import os import random import sys import time import urllib.parse import urllib.request from datetime import datetime, timezone import psycopg2 import psycopg2.extras API_BASE = os.environ.get('PGZ_API_BASE', 'http://localhost:8095') SLEEP_S = int(os.environ.get('ENRICHER_SLEEP', '300')) DRY = os.environ.get('ENRICHER_DRY', '0') == '1' USER_HDR = os.environ.get('ENRICHER_USER', 'enricher@pgz.local') LOG_PATH = '/opt/pgz-sport/_logs/enrichment_worker.log' _pgh = os.environ.get('PG_HOST', '10.10.0.2') _pgp = int(os.environ.get('PG_PORT', '6432')) if _pgh in ('localhost', '127.0.0.1'): _pgh = os.environ.get('DB_HOST', '10.10.0.2') _pgp = int(os.environ.get('DB_PORT', '6432')) DB = dict(host=_pgh, port=_pgp, dbname=os.environ.get('PG_DB', 'rinet_v3'), user=os.environ.get('PG_USER', 'rinet'), password=os.environ.get('PG_PASS', 'R1net2026!SecureDB#v7')) def _log(msg: str) -> None: line = f"{datetime.now(timezone.utc).isoformat()}Z {msg}" print(line, flush=True) try: os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) with open(LOG_PATH, 'a') as f: f.write(line + "\n") except Exception: pass def _heartbeat() -> None: try: import redis r = redis.Redis(host=os.environ.get('REDIS_HOST', 'localhost'), port=int(os.environ.get('REDIS_PORT', '6379')), password=os.environ.get('REDIS_PASS', None)) r.set('cc:pgz-enricher:heartbeat', str(int(time.time()))) except Exception: pass def _db(): c = psycopg2.connect(**DB); c.autocommit = True; return c def _pick_sportas(limit: int = 25) -> list[int]: """Athletes that look enrichable but haven't been enriched recently.""" sql = """ SELECT id FROM pgz_sport.clanovi WHERE aktivan = TRUE AND (profile_url IS NULL OR profile_url = '' OR slika_url IS NULL OR slika_url = '' OR biografija IS NULL OR biografija = '' OR datum_rodenja IS NULL) AND ( source IN ('hns_semafor','hns_family','manual','godisnjak') OR jsonb_exists(vanjski_id, 'hns_comet') OR (source_url ILIKE '%%semafor.hns.family%%') ) AND ((metadata->>'enriched_at') IS NULL OR (metadata->>'enriched_at')::timestamptz < now() - interval '7 days') ORDER BY random() LIMIT %s """ with _db() as c, c.cursor() as cur: cur.execute(sql, (limit,)) return [r[0] for r in cur.fetchall()] def _pick_klub(limit: int = 10) -> list[int]: sql = """ SELECT id FROM pgz_sport.klubovi WHERE aktivan = TRUE AND (web IS NULL OR email IS NULL OR telefon IS NULL OR opis_djelatnosti IS NULL) AND ((metadata->>'enriched_at') IS NULL OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days') ORDER BY random() LIMIT %s """ with _db() as c, c.cursor() as cur: cur.execute(sql, (limit,)) return [r[0] for r in cur.fetchall()] def _pick_savez(limit: int = 5) -> list[int]: sql = """ SELECT id FROM pgz_sport.savezi WHERE (web IS NULL OR email IS NULL OR telefon IS NULL) AND ((metadata->>'enriched_at') IS NULL OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days') ORDER BY random() LIMIT %s """ with _db() as c, c.cursor() as cur: cur.execute(sql, (limit,)) return [r[0] for r in cur.fetchall()] def _http_post(path: str, body: dict | None = None) -> dict | None: url = API_BASE.rstrip('/') + path data = json.dumps(body or {}).encode('utf-8') req = urllib.request.Request( url, data=data, method='POST', headers={'Content-Type': 'application/json', 'X-User-Email': USER_HDR}) try: with urllib.request.urlopen(req, timeout=60) as r: return json.loads(r.read().decode('utf-8')) except Exception as e: _log(f"POST {path} failed: {type(e).__name__}: {e}") return None def _process(kind: str, eid: int) -> tuple[int, list[str]]: """Run preview + apply for one entity. Returns (#applied, fields).""" # /apply with empty body re-runs the preview server-side and writes the # full proposal — the cheapest way to flush a row through. res = _http_post(f'/api/v2/enrich/{kind}/{eid}/apply', {}) if not res or 'applied' not in res: return (0, []) applied = res['applied'] return (len(applied), list(applied.keys())) def _cycle() -> dict: started = time.time() out = {'sportas': 0, 'klub': 0, 'savez': 0, 'fields_total': 0} fields_total = 0 for kind, picker, limit in ( ('sportas', _pick_sportas, 25), ('klub', _pick_klub, 10), ('savez', _pick_savez, 5), ): ids = picker(limit) random.shuffle(ids) _log(f"cycle: {kind} candidates={len(ids)}") for eid in ids: if DRY: continue n, fields = _process(kind, eid) out[kind] += 1 fields_total += n if n: _log(f" {kind}#{eid} → +{n} fields {','.join(fields)}") time.sleep(1.5) # gentle pacing _heartbeat() out['fields_total'] = fields_total out['elapsed_s'] = round(time.time() - started, 1) return out def main() -> int: _log(f"enrichment_worker starting | API_BASE={API_BASE} | sleep={SLEEP_S}s | dry={DRY}") while True: try: stats = _cycle() _log(f"cycle done: {json.dumps(stats)}") except Exception as e: _log(f"cycle FAILED: {type(e).__name__}: {e}") _heartbeat() time.sleep(SLEEP_S) if __name__ == '__main__': sys.exit(main() or 0)