diff --git a/routers/enrich_router.py b/routers/enrich_router.py index 9df44f4..3da97bd 100644 --- a/routers/enrich_router.py +++ b/routers/enrich_router.py @@ -280,8 +280,12 @@ def _load_row(kind: str, eid: int) -> dict: adresa, godina_osnutka, source_url, metadata FROM pgz_sport.savezi WHERE id=%s""", (eid,)) elif kind == 'sportas': - row = _fetch_one("""SELECT id, ime, prezime, sport, klub_id, profile_url, scrape_url, - slika_url, source_url, hns_igrac_id, biografija, metadata + row = _fetch_one("""SELECT id, ime, prezime, sport, klub_id, profile_url, + slika_url, source_url, source, source_id, + hns_igrac_id, biografija, + datum_rodenja, mjesto_rodenja, broj_dresa, + visina_cm, tezina_kg, dominantna_noga, oib, + vanjski_id, metadata FROM pgz_sport.clanovi WHERE id=%s""", (eid,)) else: raise HTTPException(400, "kind must be klub|savez|sportas") @@ -401,16 +405,213 @@ def _propose_for_savez(row: dict) -> dict: return {'proposed': proposed, 'sources': sources} +# ─── HNS Semafor parsing ──────────────────────────────────────────────── +_HNS_BASE = 'https://semafor.hns.family' + +def _slugify(name: str) -> str: + import unicodedata + s = unicodedata.normalize('NFKD', name or '').encode('ascii', 'ignore').decode('ascii').lower() + return re.sub(r'[^a-z0-9]+', '-', s).strip('-') + +def _hns_url_from_row(row: dict) -> Optional[str]: + """Try to build a semafor.hns.family /igraci/ URL for this row.""" + # 1) Already-set columns + for k in ('profile_url', 'source_url'): + u = row.get(k) + if u and 'semafor.hns.family/igraci/' in (u or ''): + return u + # 2) hns_igrac_id column + pid = row.get('hns_igrac_id') + if pid: + slug = _slugify(((row.get('ime') or '') + ' ' + (row.get('prezime') or '')).strip()) + return f'{_HNS_BASE}/igraci/{int(pid)}/{slug}/' + # 3) vanjski_id JSONB → hns_comet + vid = row.get('vanjski_id') or {} + if isinstance(vid, dict): + comet = vid.get('hns_comet') or vid.get('hns_pid') + slug = vid.get('hns_slug') or _slugify(((row.get('ime') or '') + ' ' + (row.get('prezime') or '')).strip()) + if comet: + try: + return f'{_HNS_BASE}/igraci/{int(comet)}/{slug}/' + except Exception: + pass + # 4) source='hns_semafor' + source_id + if (row.get('source') or '').startswith('hns_') and row.get('source_id'): + try: + slug = _slugify(((row.get('ime') or '') + ' ' + (row.get('prezime') or '')).strip()) + return f'{_HNS_BASE}/igraci/{int(row["source_id"])}/{slug}/' + except Exception: + pass + return None + + +def _parse_hns_player(html_doc: str, url: str) -> Optional[dict]: + """Extract structured fields from a semafor.hns.family player page.""" + if not html_doc: return None + try: + from bs4 import BeautifulSoup + except Exception: + return _parse_hns_player_regex(html_doc, url) + soup = BeautifulSoup(html_doc, 'html.parser') + out: dict[str, Any] = {'source': 'semafor.hns.family', 'url': url} + + # hns_igrac_id from URL + m = re.search(r'/igraci/(\d+)/', url) + if m: out['hns_igrac_id'] = int(m.group(1)) + + title = soup.find('title') + if title: out['title'] = title.get_text(strip=True)[:300] + + # Photo + photo = soup.find('div', class_='photo') + if photo: + img = photo.find('img') + if img and img.get('src'): + src = img['src'] + if not src.startswith('http'): + src = urllib.parse.urljoin(url, src) + out['slika_url'] = src + + # Player number (jersey) + pn = soup.find('div', class_='playerName') + if pn: + h3 = pn.find('h3') + if h3: + t = h3.get_text(strip=True) + if t.isdigit(): + out['broj_dresa'] = int(t) + + # Datum rodjenja + li = soup.find('li', class_='dob') + if li: + h4 = li.find('h4') + if h4: + t = h4.get_text(' ', strip=True) + mm = re.match(r'(\d{1,2})\.(\d{1,2})\.(\d{4})', t) + if mm: + from datetime import date as _date + try: + out['datum_rodenja'] = _date(int(mm.group(3)), int(mm.group(2)), int(mm.group(1))).isoformat() + except Exception: + pass + + # Mjesto rodjenja + li = soup.find('li', class_='pob') + if li: + h4 = li.find('h4') + if h4: + out['mjesto_rodenja'] = h4.get_text(strip=True) + + # Trenutni klub (info only — we don't reassign klub_id from here) + klub_link = soup.find('a', href=re.compile(r'/klubovi/(\d+)/')) + if klub_link: + h4 = klub_link.find('h4') + if h4: + out['trenutni_klub'] = h4.get_text(strip=True) + m = re.search(r'/klubovi/(\d+)/', klub_link.get('href') or '') + if m: out['hns_klub_id'] = int(m.group(1)) + + # Description (meta) + meta_d = soup.find('meta', attrs={'name': 'description'}) + if meta_d and meta_d.get('content'): + out['description'] = meta_d['content'][:600] + + # Make a clean text blob for relevance / DeepSeek + text = soup.get_text(' ', strip=True) + out['raw_text'] = re.sub(r'\s+', ' ', text)[:4000] + out['extract'] = (out.get('description') + or (out['raw_text'][:500] if out.get('raw_text') else None)) + return out + + +def _parse_hns_player_regex(html_doc: str, url: str) -> Optional[dict]: + """BS4-free fallback parser.""" + out: dict[str, Any] = {'source': 'semafor.hns.family', 'url': url} + m = re.search(r'/igraci/(\d+)/', url) + if m: out['hns_igrac_id'] = int(m.group(1)) + m = re.search(r'
.*?

(\d{1,2})\.(\d{1,2})\.(\d{4})', html_doc, re.S) + if m: + from datetime import date as _date + try: + out['datum_rodenja'] = _date(int(m.group(3)), int(m.group(2)), int(m.group(1))).isoformat() + except Exception: + pass + m = re.search(r'
  • ([^<]+)

    ', html_doc) + if m: out['mjesto_rodenja'] = m.group(1).strip() + m = re.search(r'

    (\d+)

    ', html_doc) + if m: out['broj_dresa'] = int(m.group(1)) + m = re.search(r' Optional[dict]: + body = _http_get(url, timeout=8) + if not body: + # Try Playwright fallback + if _HAS_PW and _pw_scraper is not None: + r = _pw_scraper.fetch_rendered(url, timeout_ms=15000) + if r and r.get('html_len', 0) > 2000: + # We didn't store html in fetch_rendered — re-fetch text only is enough + # but we need html for parse. Do a simple HTTP retry with longer timeout. + body = _http_get(url, timeout=15) + return _parse_hns_player(body, url) if body else None + + def _propose_for_sportas(row: dict) -> dict: naziv = ((row.get('ime') or '') + ' ' + (row.get('prezime') or '')).strip() sources, evidence = [], [] - wiki = _wiki_summary(naziv) - if wiki: sources.append(wiki); evidence.append(wiki.get('extract') or '') proposed: dict[str, Any] = {} - if not row.get('biografija') and evidence: - descr = _deepseek_describe(naziv, 'sportaš', evidence) - if not descr and wiki: descr = wiki.get('extract') - if descr: proposed['biografija'] = descr.strip()[:2000] + + # 1) Resolve a HNS Semafor URL for this athlete (column / vanjski_id / source_id) + hns_url = _hns_url_from_row(row) + hns_doc: Optional[dict] = None + if hns_url: + hns_doc = _hns_fetch_player(hns_url) + if hns_doc: + sources.append(hns_doc) + evidence.append(hns_doc.get('raw_text') or hns_doc.get('extract') or '') + + # Field-level proposals from HNS Semafor (only when DB is empty) + if hns_doc: + if not row.get('profile_url') and hns_doc.get('url'): + proposed['profile_url'] = hns_doc['url'] + if not row.get('source_url') and hns_doc.get('url'): + proposed['source_url'] = hns_doc['url'] + if not row.get('slika_url') and hns_doc.get('slika_url'): + proposed['slika_url'] = hns_doc['slika_url'] + if not row.get('hns_igrac_id') and hns_doc.get('hns_igrac_id'): + proposed['hns_igrac_id'] = hns_doc['hns_igrac_id'] + if not row.get('datum_rodenja') and hns_doc.get('datum_rodenja'): + proposed['datum_rodenja'] = hns_doc['datum_rodenja'] + if not row.get('mjesto_rodenja') and hns_doc.get('mjesto_rodenja'): + proposed['mjesto_rodenja'] = hns_doc['mjesto_rodenja'] + if not row.get('broj_dresa') and hns_doc.get('broj_dresa'): + proposed['broj_dresa'] = hns_doc['broj_dresa'] + + # 2) Wikipedia HR for biografija + if not row.get('biografija'): + wiki = _wiki_summary(naziv) + if wiki: + sources.append(wiki) + evidence.append(wiki.get('extract') or '') + + # Description: prefer DeepSeek synthesis from all evidence; fallback to first long snippet + if not row.get('biografija'): + descr = _deepseek_describe(naziv, 'sportaš', evidence) if evidence else None + if not descr: + for s in sources: + ext = s.get('extract') + if ext and len(ext) >= 80: + descr = ext; break + if descr: + proposed['biografija'] = descr.strip()[:2000] + return {'proposed': proposed, 'sources': sources} @@ -428,7 +629,9 @@ def enrich_preview(kind: str, eid: int): elif kind == 'savez': keys = ['oib','sport','predsjednik','tajnik','email','telefon','web','adresa','godina_osnutka'] else: - keys = ['sport','profile_url','slika_url','hns_igrac_id','biografija'] + keys = ['sport','profile_url','slika_url','hns_igrac_id','biografija', + 'datum_rodenja','mjesto_rodenja','broj_dresa','visina_cm','tezina_kg', + 'dominantna_noga','oib'] naziv = _display_name(kind, row) grad = row.get('grad') if kind == 'klub' else None @@ -462,11 +665,13 @@ def enrich_preview(kind: str, eid: int): _TABLE_MAP = { 'klub': ('pgz_sport.klubovi', {'web','email','telefon','predsjednik','tajnik', - 'opis_djelatnosti','ciljevi','godina_osnutka','sjediste'}), + 'opis_djelatnosti','ciljevi','godina_osnutka','sjediste','adresa'}), 'savez': ('pgz_sport.savezi', {'web','email','telefon','predsjednik','tajnik','adresa','godina_osnutka'}), 'sportas': ('pgz_sport.clanovi', - {'biografija','profile_url','slika_url'}), + {'biografija','profile_url','source_url','slika_url','hns_igrac_id', + 'datum_rodenja','mjesto_rodenja','broj_dresa','visina_cm', + 'tezina_kg','dominantna_noga','oib'}), } diff --git a/workers/enrichment_worker.py b/workers/enrichment_worker.py new file mode 100644 index 0000000..9c4f69b --- /dev/null +++ b/workers/enrichment_worker.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +""" +enrichment_worker.py — 24/7 background enrichment for PGŽ Sport +Author: Damir Radulić (damir@rinet.one) / dradulic@outlook.com +Date: 2026-05-04 +Version: 1.0.0 + +Polls pgz_sport.clanovi / klubovi / savezi for under-enriched rows, then +calls the live HTTP endpoints (POST /sport/api/v2/enrich/{kind}/{id} and +.../apply) so every row goes through the same pipeline (and audit log) +that the UI uses. This avoids forking the enrichment logic. + +Selection rules (per cycle): + - sportas: clanovi rows missing profile_url AND (source IN ('hns_semafor','manual') + OR vanjski_id ? 'hns_comet') ORDER BY random() LIMIT 25 + - klub: klubovi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 10 + - savez: savezi rows whose metadata.enriched_at is NULL ORDER BY random() LIMIT 5 + +Sleep 300 s between cycles (configurable via ENRICHER_SLEEP env). + +Heartbeat to redis (cc:pgz-enricher:heartbeat) and log every cycle to +/opt/pgz-sport/_logs/enrichment_worker.log. +""" +from __future__ import annotations +import json +import os +import random +import sys +import time +import urllib.parse +import urllib.request +from datetime import datetime, timezone + +import psycopg2 +import psycopg2.extras + +API_BASE = os.environ.get('PGZ_API_BASE', 'http://localhost:8095') +SLEEP_S = int(os.environ.get('ENRICHER_SLEEP', '300')) +DRY = os.environ.get('ENRICHER_DRY', '0') == '1' +USER_HDR = os.environ.get('ENRICHER_USER', 'enricher@pgz.local') + +LOG_PATH = '/opt/pgz-sport/_logs/enrichment_worker.log' + +_pgh = os.environ.get('PG_HOST', '10.10.0.2') +_pgp = int(os.environ.get('PG_PORT', '6432')) +if _pgh in ('localhost', '127.0.0.1'): + _pgh = os.environ.get('DB_HOST', '10.10.0.2') + _pgp = int(os.environ.get('DB_PORT', '6432')) +DB = dict(host=_pgh, port=_pgp, + dbname=os.environ.get('PG_DB', 'rinet_v3'), + user=os.environ.get('PG_USER', 'rinet'), + password=os.environ.get('PG_PASS', 'R1net2026!SecureDB#v7')) + + +def _log(msg: str) -> None: + line = f"{datetime.now(timezone.utc).isoformat()}Z {msg}" + print(line, flush=True) + try: + os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True) + with open(LOG_PATH, 'a') as f: + f.write(line + "\n") + except Exception: + pass + + +def _heartbeat() -> None: + try: + import redis + r = redis.Redis(host=os.environ.get('REDIS_HOST', 'localhost'), + port=int(os.environ.get('REDIS_PORT', '6379')), + password=os.environ.get('REDIS_PASS', None)) + r.set('cc:pgz-enricher:heartbeat', str(int(time.time()))) + except Exception: + pass + + +def _db(): + c = psycopg2.connect(**DB); c.autocommit = True; return c + + +def _pick_sportas(limit: int = 25) -> list[int]: + """Athletes that look enrichable but haven't been enriched recently.""" + sql = """ + SELECT id FROM pgz_sport.clanovi + WHERE aktivan = TRUE + AND (profile_url IS NULL OR profile_url = '' + OR slika_url IS NULL OR slika_url = '' + OR biografija IS NULL OR biografija = '' + OR datum_rodenja IS NULL) + AND ( + source IN ('hns_semafor','hns_family','manual','godisnjak') + OR jsonb_exists(vanjski_id, 'hns_comet') + OR (source_url ILIKE '%%semafor.hns.family%%') + ) + AND ((metadata->>'enriched_at') IS NULL + OR (metadata->>'enriched_at')::timestamptz < now() - interval '7 days') + ORDER BY random() + LIMIT %s + """ + with _db() as c, c.cursor() as cur: + cur.execute(sql, (limit,)) + return [r[0] for r in cur.fetchall()] + + +def _pick_klub(limit: int = 10) -> list[int]: + sql = """ + SELECT id FROM pgz_sport.klubovi + WHERE aktivan = TRUE + AND (web IS NULL OR email IS NULL OR telefon IS NULL OR opis_djelatnosti IS NULL) + AND ((metadata->>'enriched_at') IS NULL + OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days') + ORDER BY random() + LIMIT %s + """ + with _db() as c, c.cursor() as cur: + cur.execute(sql, (limit,)) + return [r[0] for r in cur.fetchall()] + + +def _pick_savez(limit: int = 5) -> list[int]: + sql = """ + SELECT id FROM pgz_sport.savezi + WHERE (web IS NULL OR email IS NULL OR telefon IS NULL) + AND ((metadata->>'enriched_at') IS NULL + OR (metadata->>'enriched_at')::timestamptz < now() - interval '14 days') + ORDER BY random() + LIMIT %s + """ + with _db() as c, c.cursor() as cur: + cur.execute(sql, (limit,)) + return [r[0] for r in cur.fetchall()] + + +def _http_post(path: str, body: dict | None = None) -> dict | None: + url = API_BASE.rstrip('/') + path + data = json.dumps(body or {}).encode('utf-8') + req = urllib.request.Request( + url, data=data, method='POST', + headers={'Content-Type': 'application/json', + 'X-User-Email': USER_HDR}) + try: + with urllib.request.urlopen(req, timeout=60) as r: + return json.loads(r.read().decode('utf-8')) + except Exception as e: + _log(f"POST {path} failed: {type(e).__name__}: {e}") + return None + + +def _process(kind: str, eid: int) -> tuple[int, list[str]]: + """Run preview + apply for one entity. Returns (#applied, fields).""" + # /apply with empty body re-runs the preview server-side and writes the + # full proposal — the cheapest way to flush a row through. + res = _http_post(f'/api/v2/enrich/{kind}/{eid}/apply', {}) + if not res or 'applied' not in res: + return (0, []) + applied = res['applied'] + return (len(applied), list(applied.keys())) + + +def _cycle() -> dict: + started = time.time() + out = {'sportas': 0, 'klub': 0, 'savez': 0, 'fields_total': 0} + fields_total = 0 + for kind, picker, limit in ( + ('sportas', _pick_sportas, 25), + ('klub', _pick_klub, 10), + ('savez', _pick_savez, 5), + ): + ids = picker(limit) + random.shuffle(ids) + _log(f"cycle: {kind} candidates={len(ids)}") + for eid in ids: + if DRY: + continue + n, fields = _process(kind, eid) + out[kind] += 1 + fields_total += n + if n: + _log(f" {kind}#{eid} → +{n} fields {','.join(fields)}") + time.sleep(1.5) # gentle pacing + _heartbeat() + out['fields_total'] = fields_total + out['elapsed_s'] = round(time.time() - started, 1) + return out + + +def main() -> int: + _log(f"enrichment_worker starting | API_BASE={API_BASE} | sleep={SLEEP_S}s | dry={DRY}") + while True: + try: + stats = _cycle() + _log(f"cycle done: {json.dumps(stats)}") + except Exception as e: + _log(f"cycle FAILED: {type(e).__name__}: {e}") + _heartbeat() + time.sleep(SLEEP_S) + + +if __name__ == '__main__': + sys.exit(main() or 0)