Files
pgz-sport/workers/enrichment_worker.py_prije_env_deepseek

397 lines
14 KiB
Python

#!/usr/bin/env python3
"""
enrichment_worker.py — 24/7 background enrichment for PGŽ Sport
Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com
Date: 2026-05-04
Version: 1.0.0
Polls pgz_sport.clanovi / klubovi / savezi for under-enriched rows, then
calls the live HTTP endpoints (POST /sport/api/v2/enrich/{kind}/{id} and
.../apply) so every row goes through the same pipeline (and audit log)
that the UI uses. This avoids forking the enrichment logic.
Selection rules (per cycle):
- sportas: clanovi rows missing profile_url AND (source IN ('hns_semafor','manual')
OR vanjski_id ? 'hns_comet') ORDER BY random() LIMIT 25
- klub: klubovi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 10
- savez: savezi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 5
Sleep 300 s between cycles (configurable via ENRICHER_SLEEP env).
Heartbeat to redis (cc:pgz-enricher:heartbeat) and log every cycle to
/opt/pgz-sport/_logs/enrichment_worker.log.
"""
from __future__ import annotations
import json
import os
import random
import sys
import time
import urllib.parse
import urllib.request
from datetime import datetime, timezone
import psycopg2
import psycopg2.extras
API_BASE = os.environ.get('PGZ_API_BASE', 'http://localhost:8095')
SLEEP_S = int(os.environ.get('ENRICHER_SLEEP', '300'))
DRY = os.environ.get('ENRICHER_DRY', '0') == '1'
USER_HDR = os.environ.get('ENRICHER_USER', 'enricher@pgz.local')
LOG_PATHS = [
'/var/log/pgz-sport-enricher.log',
'/opt/pgz-sport/_logs/enrichment_worker.log',
]
CONFIDENCE_MIN = float(os.environ.get('ENRICHER_CONFIDENCE', '0.7'))
COVERAGE_MAX = int(os.environ.get('ENRICHER_COVERAGE_MAX', '70'))
_pgh = os.environ.get('PG_HOST', '10.10.0.2')
_pgp = int(os.environ.get('PG_PORT', '6432'))
if _pgh in ('localhost', '127.0.0.1'):
_pgh = os.environ.get('DB_HOST', '10.10.0.2')
_pgp = int(os.environ.get('DB_PORT', '6432'))
DB = dict(host=_pgh, port=_pgp,
dbname=os.environ.get('PG_DB', 'rinet_v3'),
user=os.environ.get('PG_USER', 'rinet'),
password=os.environ["DB_PASSWORD"])
def _log(msg: str) -> None:
line = f"{datetime.now(timezone.utc).isoformat()}Z {msg}"
print(line, flush=True)
for p in LOG_PATHS:
try:
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, 'a') as f:
f.write(line + "\n")
except Exception:
pass
def _redis():
try:
import redis
except Exception:
return None
host = os.environ.get('REDIS_HOST', 'localhost')
port = int(os.environ.get('REDIS_PORT', '6379'))
pwd = (os.environ.get('REDIS_PASS') or '').strip().strip("'").strip('"') or None
for p in (pwd, None):
try:
r = redis.Redis(host=host, port=port, password=p,
decode_responses=True, socket_connect_timeout=2)
r.ping()
return r
except Exception:
continue
return None
def _heartbeat(meta: dict | None = None) -> None:
r = _redis()
if not r: return
try:
r.set('cc:pgz-enricher:heartbeat', str(int(time.time())))
if meta is not None:
r.set('cc:pgz-enricher:last_cycle', json.dumps(meta, default=str))
except Exception:
pass
def _is_paused() -> bool:
r = _redis()
if not r: return False
try:
return (r.get('cc:pgz-enricher:pause') or '0') == '1'
except Exception:
return False
def _consume_run_now() -> bool:
r = _redis()
if not r: return False
try:
v = r.get('cc:pgz-enricher:run_now')
if v == '1':
r.set('cc:pgz-enricher:run_now', '0')
return True
except Exception:
return False
return False
def _refresh_confidence() -> None:
"""Read live confidence override from redis (set by /worker/confidence)."""
global CONFIDENCE_MIN
r = _redis()
if not r: return
try:
v = r.get('cc:pgz-enricher:confidence')
if v:
CONFIDENCE_MIN = float(v)
except Exception:
pass
def _bump_fields_24h(n: int) -> None:
if n <= 0: return
r = _redis()
if not r: return
try:
r.incrby('cc:pgz-enricher:fields_24h', n)
r.expire('cc:pgz-enricher:fields_24h', 86400)
except Exception:
pass
def _db():
c = psycopg2.connect(**DB); c.autocommit = True; return c
# Coverage = (filled key fields) / (total key fields) * 100. Keep these in sync
# with enrich_router.enrich_preview() which surfaces the same scores in the UI.
_KLUB_KEYS = ('oib','sport','grad','predsjednik','tajnik','web','email','telefon',
'sjediste','godina_osnutka','ciljevi','opis_djelatnosti')
_SAVEZ_KEYS = ('oib','sport','predsjednik','tajnik','email','telefon','web',
'adresa','godina_osnutka')
# Coverage for sportas — fields the user actually wants populated.
_SPORTAS_KEYS = ('sport','profile_url','slika_url','hns_igrac_id','biografija',
'datum_rodenja','mjesto_rodenja','broj_dresa')
def _coverage_expr(table_keys: tuple[str, ...], prefix: str = '') -> str:
"""Postgres expression that returns 0..100 coverage % for the row.
`prefix` is e.g. 'c.' when the SQL uses a table alias.
"""
parts = []
for k in table_keys:
col = f"{prefix}{k}"
parts.append(f"(CASE WHEN {col} IS NOT NULL AND ({col}::text) <> '' THEN 1 ELSE 0 END)")
total = len(table_keys)
return f"((({' + '.join(parts)})::numeric * 100) / {total})"
def _pick_sportas(limit: int = 50) -> list[int]:
"""Athletes with coverage<COVERAGE_MAX, randomly ordered.
Selection is sport-agnostic now: the router decides which federation to
query based on c.sport (or klubovi.sport via the JOIN). We require either
sport to be set on the row OR a known external linkage so we don't burn
cycles on rows the router can't enrich.
"""
cov = _coverage_expr(_SPORTAS_KEYS, prefix='c.')
sql = f"""
SELECT c.id
FROM pgz_sport.clanovi c
LEFT JOIN pgz_sport.klubovi k ON k.id = c.klub_id
WHERE c.aktivan = TRUE
AND {cov} < %s
AND (
c.sport IS NOT NULL
OR k.sport IS NOT NULL
OR c.source IN ('hns_semafor','hns_family','manual','godisnjak','hbs_savez','hks_savez')
OR jsonb_exists(c.vanjski_id, 'hns_comet')
OR (c.source_url ILIKE '%%semafor.hns.family%%')
OR (c.profile_url ILIKE '%%semafor.hns.family%%')
OR (c.source_url ILIKE '%%hrvatski-bocarski-savez.hr%%')
OR (c.profile_url ILIKE '%%hrvatski-bocarski-savez.hr%%')
)
AND ((c.metadata->>'enriched_at') IS NULL
OR (c.metadata->>'enriched_at')::timestamptz < now() - interval '7 days')
AND ((c.metadata->'enrichment_block') IS NULL
OR (c.metadata->'enrichment_block'->>'at')::timestamptz < now() - interval '30 days')
ORDER BY random()
LIMIT %s
"""
with _db() as c, c.cursor() as cur:
cur.execute(sql, (COVERAGE_MAX, limit))
return [r[0] for r in cur.fetchall()]
def _pick_klub(limit: int = 50) -> list[int]:
cov = _coverage_expr(_KLUB_KEYS)
sql = f"""
SELECT id FROM pgz_sport.klubovi
WHERE aktivan = TRUE
AND {cov} < %s
AND ((metadata->>'enriched_at') IS NULL
OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days')
AND ((metadata->'enrichment_block') IS NULL
OR (metadata->'enrichment_block'->>'at')::timestamptz < now() - interval '30 days')
ORDER BY random()
LIMIT %s
"""
with _db() as c, c.cursor() as cur:
cur.execute(sql, (COVERAGE_MAX, limit))
return [r[0] for r in cur.fetchall()]
def _pick_savez(limit: int = 50) -> list[int]:
cov = _coverage_expr(_SAVEZ_KEYS)
sql = f"""
SELECT id FROM pgz_sport.savezi
WHERE {cov} < %s
AND ((metadata->>'enriched_at') IS NULL
OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days')
AND ((metadata->'enrichment_block') IS NULL
OR (metadata->'enrichment_block'->>'at')::timestamptz < now() - interval '30 days')
ORDER BY random()
LIMIT %s
"""
with _db() as c, c.cursor() as cur:
cur.execute(sql, (COVERAGE_MAX, limit))
return [r[0] for r in cur.fetchall()]
def _http_post(path: str, body: dict | None = None) -> dict | None:
url = API_BASE.rstrip('/') + path
data = json.dumps(body or {}).encode('utf-8')
req = urllib.request.Request(
url, data=data, method='POST',
headers={'Content-Type': 'application/json',
'X-User-Email': USER_HDR})
try:
with urllib.request.urlopen(req, timeout=60) as r:
return json.loads(r.read().decode('utf-8'))
except Exception as e:
_log(f"POST {path} failed: {type(e).__name__}: {e}")
return None
# Per-source confidence weights. Anything written by an HNS Semafor /igraci/
# page is structured + verified, so we trust it implicitly. Wikipedia summaries
# are mostly safe but free-form. sport-pgz.hr "O nama" pages tend to be the
# zajednica generic info, so we down-weight them so a plain DeepSeek synthesis
# off a single sport-pgz.hr source falls below the gate.
_SOURCE_WEIGHTS = {
'semafor.hns.family': 0.95,
'hrvatski-bocarski-savez.hr': 0.92,
'hns-cff.hr': 0.90,
'hks-cbf.hr': 0.90,
'hrs.hr': 0.90,
'hos-cvf.hr': 0.90,
'hvs.hr': 0.90,
'hps.hr': 0.90,
'atletika.hr': 0.90,
'htsavez.hr': 0.90,
'judo-savez.hr': 0.88,
'karate.hr': 0.88,
'veslacki-savez.hr': 0.88,
'gimnastika.hr': 0.88,
'stolni-tenis.hr': 0.88,
'kuglanje.hr': 0.88,
'wikipedia.hr': 0.80,
'sport-pgz.hr': 0.55,
}
# Fields that are safe to auto-write even from low-confidence sources because
# they come from the entity's own structured page (URLs, IDs).
_HARD_FIELDS = {'profile_url','source_url','slika_url','hns_igrac_id'}
def _confidence(proposed: dict, sources: list[dict]) -> float:
"""Crude 0..1 score: max source weight, scaled by evidence count."""
if not proposed:
return 0.0
weights = []
for s in sources or []:
w = _SOURCE_WEIGHTS.get((s.get('source') or '').lower(), 0.50)
weights.append(w)
if not weights:
return 0.0
base = max(weights)
bonus = min(0.10, 0.03 * (len(sources) - 1))
return min(1.0, base + bonus)
def _process(kind: str, eid: int) -> tuple[int, list[str]]:
"""Preview → confidence gate → apply. Returns (#applied, fields)."""
preview = _http_post(f'/api/v2/enrich/{kind}/{eid}', {})
if not preview:
return (0, [])
proposed = preview.get('proposed') or {}
sources = preview.get('sources') or []
if not proposed:
return (0, [])
conf = _confidence(proposed, sources)
# Always allow hard structured fields (URLs / IDs) — they are objective.
hard = {k: v for k, v in proposed.items() if k in _HARD_FIELDS}
soft = {k: v for k, v in proposed.items() if k not in _HARD_FIELDS}
fields = dict(hard)
if conf >= CONFIDENCE_MIN:
fields.update(soft)
if not fields:
_log(f" {kind}#{eid} skipped — confidence {conf:.2f} < {CONFIDENCE_MIN:.2f}")
return (0, [])
res = _http_post(f'/api/v2/enrich/{kind}/{eid}/apply',
{'fields': fields, 'sources': sources})
if not res or 'applied' not in res:
return (0, [])
applied = res['applied']
if applied:
_log(f" {kind}#{eid} conf={conf:.2f} → +{len(applied)} {','.join(applied.keys())}")
return (len(applied), list(applied.keys()))
def _cycle() -> dict:
_refresh_confidence()
started = time.time()
out = {'sportas': 0, 'klub': 0, 'savez': 0, 'fields_total': 0,
'started_at': datetime.now(timezone.utc).isoformat()}
fields_total = 0
for kind, picker, limit in (
('sportas', _pick_sportas, 50),
('klub', _pick_klub, 20),
('savez', _pick_savez, 5),
):
ids = picker(limit)
random.shuffle(ids)
_log(f"cycle: {kind} candidates={len(ids)} coverage<{COVERAGE_MAX} conf>={CONFIDENCE_MIN}")
for eid in ids:
if DRY:
continue
if _is_paused():
_log("paused → break out of cycle")
break
n, fields = _process(kind, eid)
out[kind] += 1
fields_total += n
if n: _bump_fields_24h(n)
time.sleep(1.5) # gentle pacing
_heartbeat()
out['fields_total'] = fields_total
out['elapsed_s'] = round(time.time() - started, 1)
out['ended_at'] = datetime.now(timezone.utc).isoformat()
return out
def main() -> int:
_log(f"enrichment_worker starting | API_BASE={API_BASE} | sleep={SLEEP_S}s | dry={DRY}")
while True:
if _is_paused():
_log("paused (cc:pgz-enricher:pause=1) — sleeping 30s")
_heartbeat({'paused': True})
time.sleep(30)
continue
try:
stats = _cycle()
_log(f"cycle done: {json.dumps(stats)}")
_heartbeat(stats)
except Exception as e:
_log(f"cycle FAILED: {type(e).__name__}: {e}")
_heartbeat({'error': str(e)[:200]})
# Sleep in 5-second slices so /worker/run-now and /pause respond fast.
elapsed = 0
while elapsed < SLEEP_S:
if _consume_run_now():
_log("run-now signal received → starting next cycle early")
break
if _is_paused():
break
time.sleep(5); elapsed += 5
if __name__ == '__main__':
sys.exit(main() or 0)