#!/usr/bin/env python3 # sub4_enrich.py v1.0 - dradulic@outlook.com / damir@rinet.one - 2026-05-05 # Description: Enrich pgz_sport.manifestacije with web + wiki_url candidates. # HEAD-probes Wikipedia HR/EN, verifies content match, scores confidence. # Writes XLSX kandidata + SQL apply script (no DB writes here). import csv import os import re import sys import time import unicodedata import urllib.parse import urllib.request import urllib.error import socket import ssl import json from datetime import datetime, timezone import psycopg2 import psycopg2.extras # ---------- Config ---------- ENV_PATH = "/opt/pgz-sport/.env" USER_AGENT = "PGZ-sport-data-bot/1.0 (https://api.rinet.one/sport/; dradulic@outlook.com)" TIMEOUT = 8 RATE_SLEEP = 1.1 # >1s between Wikipedia requests APPLY_THRESHOLD = 0.85 AUDIT_DIR = "/opt/pgz-sport/_audit" KANDIDATI_XLSX = f"{AUDIT_DIR}/sub4_manifestacije_kandidati.xlsx" KANDIDATI_CSV = f"{AUDIT_DIR}/sub4_manifestacije_kandidati.csv" APPLY_SQL = f"{AUDIT_DIR}/sub4_manifestacije_apply.sql" LOG_FILE = f"{AUDIT_DIR}/sub4_manifestacije.log" # ---------- ENV loader ---------- def load_env(path): env = {} with open(path, "r") as f: for line in f: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) v = v.strip().strip("'").strip('"') env[k.strip()] = v return env ENV = load_env(ENV_PATH) # ---------- Normalization ---------- def normalize_for_wiki(naziv: str) -> str: s = naziv.strip() s = re.sub(r'\s+', ' ', s) s = s.replace(' ', '_') return urllib.parse.quote(s, safe="_-") def strip_diacritics(s: str) -> str: nfkd = unicodedata.normalize('NFKD', s) return ''.join(c for c in nfkd if not unicodedata.combining(c)) def naziv_substr(naziv: str) -> str: """Pick the most distinctive 2-3 word substring for content verification.""" s = naziv.strip() # remove common generic prefixes generic = re.compile(r'^(Memorijal(ni)?|Međunarodni|Hrvatski|Trofej|Kup|Turnir|Nagrada|Dani|Regata)\s+', re.IGNORECASE) core = generic.sub('', s).strip() if len(core) < 4: core = s # take first 2 meaningful words words = core.split() if len(words) >= 2: return ' '.join(words[:2]) return core # ---------- HTTP ---------- def http_request(url: str, method: str = "GET", max_bytes: int = None): """Returns (status_code, final_url, body_bytes_or_None).""" req = urllib.request.Request(url, method=method) req.add_header("User-Agent", USER_AGENT) req.add_header("Accept-Language", "hr,en;q=0.8") ctx = ssl.create_default_context() try: with urllib.request.urlopen(req, timeout=TIMEOUT, context=ctx) as resp: status = resp.status final_url = resp.geturl() body = None if method == "GET": if max_bytes: body = resp.read(max_bytes) else: body = resp.read() return (status, final_url, body) except urllib.error.HTTPError as e: return (e.code, url, None) except (urllib.error.URLError, socket.timeout, ssl.SSLError, ConnectionError) as e: return (0, url, None) except Exception: return (0, url, None) def head_probe(url: str): return http_request(url, method="HEAD") def get_snippet(url: str, max_kb: int = 50): return http_request(url, method="GET", max_bytes=max_kb * 1024) # ---------- Verification ---------- def verify_content(url: str, naziv: str): """ Returns (status, final_url, match_count, has_disambig, sport_match). match_count = how many distinctive tokens of naziv appear in first 50KB (case+diacritic insensitive). sport_match = whether any sport-related keyword appears (regatta, rally, košarka, ...) """ status, final_url, body = get_snippet(url, max_kb=50) if status < 200 or status >= 400 or not body: return (status, final_url, 0, False, False) try: text = body.decode("utf-8", errors="ignore") except Exception: return (status, final_url, 0, False, False) text_low = strip_diacritics(text).lower() substr = strip_diacritics(naziv_substr(naziv)).lower() tokens = [t for t in re.split(r'\s+', substr) if len(t) >= 3] match_count = sum(1 for t in tokens if t in text_low) # also check if full naziv (or key words) appears full_low = strip_diacritics(naziv).lower() full_tokens = [t for t in re.split(r'\s+', full_low) if len(t) >= 4] full_matches = sum(1 for t in full_tokens if t in text_low) # Disambig detection: dedicated disambig page (NOT just hatnote link to one) # Wikipedia disambig pages have either category Stranice_za_razdvajanje or specific template. has_disambig = ( 'wgPageContentModel":"wikitext"' in text and ('Kategorija:Stranice_za_razdvajanje' in text or 'Category:Disambiguation_pages' in text or 'wgVisualEditorPageIsDisambiguation":true' in text) ) # Sport-context check: any sport keyword must appear for sport event match sport_keywords = [ 'sport', 'regat', 'rally', 'reli', 'turnir', 'memorijal', 'kup ', 'automobiliz', 'jedrilic', 'jedren', 'auto-cross', 'autocross', 'kosark', 'rukomet', 'odbojk', 'plivac', 'plivanj', 'sah ', 'šah', 'biciklizm', 'atletik', 'atletski', 'streljas', 'streljaš', 'taekwondo', 'karate', 'tenis', 'judo', 'boce', 'boćan', 'nogomet', 'sailing', 'tournament', 'football', 'basketball', 'volleyball', 'handball', 'swimming', 'athletics', 'fencing', 'archery', 'shooting', 'fishing', 'ribolov', 'maraton', 'cross-country', 'speedminton', 'badminton', 'snowboard', 'ski', 'skijanj', 'streljaški', 'vaterpolo', 'water polo' ] sport_match = any(k in text_low for k in sport_keywords) return (status, final_url, max(match_count, full_matches), has_disambig, sport_match) # ---------- Wikipedia probing ---------- def try_wikipedia(naziv: str, lang: str = "hr"): """Returns dict with keys: lang, url, status, final_url, matches, has_disambig, sport_match.""" slug = normalize_for_wiki(naziv) url = f"https://{lang}.wikipedia.org/wiki/{slug}" status, final_url, matches, has_disambig, sport_match = verify_content(url, naziv) return { "lang": lang, "url": url, "status": status, "final_url": final_url, "matches": matches, "has_disambig": has_disambig, "sport_match": sport_match, } def try_wikipedia_search(naziv: str, lang: str = "hr"): """Use Wikipedia OpenSearch API to find best title match.""" api = f"https://{lang}.wikipedia.org/w/api.php?action=opensearch&limit=3&format=json&search=" url = api + urllib.parse.quote(naziv) status, _, body = http_request(url, method="GET", max_bytes=8192) if status != 200 or not body: return None try: data = json.loads(body.decode("utf-8", errors="ignore")) # OpenSearch returns [query, [titles], [descs], [urls]] if isinstance(data, list) and len(data) >= 4: urls = data[3] titles = data[1] if urls: return {"title": titles[0] if titles else None, "url": urls[0]} except Exception: return None return None # ---------- Confidence scoring ---------- def score_confidence(probe: dict, naziv: str) -> float: """Score Wikipedia probe outcome.""" if probe is None: return 0.0 status = probe.get("status", 0) matches = probe.get("matches", 0) has_dis = probe.get("has_disambig", False) sport_match = probe.get("sport_match", False) lang = probe.get("lang", "") if status < 200 or status >= 400: return 0.0 if has_dis: return 0.4 base = 0.0 if lang == "hr": base = 0.95 if matches >= 2 else (0.80 if matches >= 1 else 0.50) elif lang == "en": base = 0.85 if matches >= 2 else (0.70 if matches >= 1 else 0.45) else: base = 0.70 if matches >= 1 else 0.40 # Penalize very short naziv (more ambiguous) if len(naziv) < 8: base = max(0.0, base - 0.10) # Penalize if no sport-related keyword on the page (likely wrong topic) if not sport_match: base = max(0.0, base - 0.40) return round(base, 2) # ---------- DB ---------- def db_connect(): return psycopg2.connect( host=ENV["PG_HOST"], port=int(ENV["PG_PORT"]), user=ENV["PG_USER"], password=ENV["PG_PASS"], dbname=ENV["PG_DB"], ) def fetch_manifestacije(): conn = db_connect() try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: # Try to read web/wiki_url; if columns missing, fallback to id+naziv only try: cur.execute(""" SELECT id, naziv, mjesto, organizator, web, wiki_url FROM pgz_sport.manifestacije WHERE COALESCE(web,'') = '' OR COALESCE(wiki_url,'') = '' ORDER BY id """) rows = [dict(r) for r in cur.fetchall()] has_cols = True except psycopg2.errors.UndefinedColumn: conn.rollback() cur.execute(""" SELECT id, naziv, mjesto, organizator FROM pgz_sport.manifestacije ORDER BY id """) rows = [dict(r) for r in cur.fetchall()] has_cols = False return rows, has_cols finally: conn.close() def fetch_summary(): conn = db_connect() try: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM pgz_sport.manifestacije") total = cur.fetchone()[0] try: cur.execute(""" SELECT COUNT(web) FILTER (WHERE COALESCE(web,'')<>''), COUNT(wiki_url) FILTER (WHERE COALESCE(wiki_url,'')<>'') FROM pgz_sport.manifestacije """) ima_web, ima_wiki = cur.fetchone() has_cols = True except psycopg2.errors.UndefinedColumn: conn.rollback() ima_web, ima_wiki = 0, 0 has_cols = False return {"total": total, "ima_web": ima_web, "ima_wiki": ima_wiki, "has_cols": has_cols} finally: conn.close() # ---------- Main loop ---------- def main(): os.makedirs(AUDIT_DIR, exist_ok=True) logf = open(LOG_FILE, "w") def log(msg): line = f"[{datetime.now(timezone.utc).isoformat()}] {msg}" print(line) logf.write(line + "\n") logf.flush() summary_before = fetch_summary() log(f"BEFORE: total={summary_before['total']} ima_web={summary_before['ima_web']} ima_wiki={summary_before['ima_wiki']} has_cols={summary_before['has_cols']}") rows, has_cols = fetch_manifestacije() log(f"Fetched {len(rows)} rows for enrichment") # Limit per spec: LIMIT 50 ako > 50 — sve smo gledali; uzmi prvih 50 ako 50+ if len(rows) > 50: rows = rows[:50] log(f"Limited to first 50 rows per spec") stats = { "probano": 0, "succ_wiki_hr": 0, "succ_wiki_en": 0, "succ_search_hr": 0, "succ_search_en": 0, "applied": 0, "kandidati": 0, "zero_match": 0, } apply_rows = [] # confidence >= 0.85 candidate_rows = [] # 0 < confidence < 0.85 for i, row in enumerate(rows, 1): rid = row["id"] naziv = row["naziv"] log(f"--- [{i}/{len(rows)}] id={rid} naziv={naziv!r}") stats["probano"] += 1 best = None # dict with url, lang, confidence, razlog # 1. HR Wikipedia direct slug probe_hr = try_wikipedia(naziv, "hr") time.sleep(RATE_SLEEP) conf_hr = score_confidence(probe_hr, naziv) log(f" WIKI-HR slug status={probe_hr['status']} matches={probe_hr['matches']} disambig={probe_hr['has_disambig']} sport={probe_hr.get('sport_match')} conf={conf_hr}") if conf_hr > 0: stats["succ_wiki_hr"] += 1 cand = {"url": probe_hr["final_url"] or probe_hr["url"], "lang": "hr", "confidence": conf_hr, "razlog": f"Wikipedia HR direct slug, matches={probe_hr['matches']}"} if best is None or cand["confidence"] > best["confidence"]: best = cand # 2. EN Wikipedia direct slug (only if HR not high-confidence) if not best or best["confidence"] < APPLY_THRESHOLD: probe_en = try_wikipedia(naziv, "en") time.sleep(RATE_SLEEP) conf_en = score_confidence(probe_en, naziv) log(f" WIKI-EN slug status={probe_en['status']} matches={probe_en['matches']} disambig={probe_en['has_disambig']} conf={conf_en}") if conf_en > 0: stats["succ_wiki_en"] += 1 cand = {"url": probe_en["final_url"] or probe_en["url"], "lang": "en", "confidence": conf_en, "razlog": f"Wikipedia EN direct slug, matches={probe_en['matches']}"} if best is None or cand["confidence"] > best["confidence"]: best = cand # 3. HR Wikipedia OpenSearch fallback if not best or best["confidence"] < APPLY_THRESHOLD: sr = try_wikipedia_search(naziv, "hr") time.sleep(RATE_SLEEP) if sr and sr.get("url"): status, final_url, matches, has_dis, sport_match = verify_content(sr["url"], naziv) time.sleep(RATE_SLEEP) fake_probe = {"lang": "hr", "url": sr["url"], "status": status, "final_url": final_url, "matches": matches, "has_disambig": has_dis, "sport_match": sport_match} conf = score_confidence(fake_probe, naziv) # search results are a step less reliable than direct slug match conf = round(max(0.0, conf - 0.05), 2) log(f" WIKI-HR search title={sr.get('title')!r} status={status} matches={matches} conf={conf}") if conf > 0: stats["succ_search_hr"] += 1 cand = {"url": final_url or sr["url"], "lang": "hr-search", "confidence": conf, "razlog": f"Wikipedia HR opensearch '{sr.get('title')}', matches={matches}"} if best is None or cand["confidence"] > best["confidence"]: best = cand # 4. EN Wikipedia OpenSearch fallback if not best or best["confidence"] < APPLY_THRESHOLD: sr = try_wikipedia_search(naziv, "en") time.sleep(RATE_SLEEP) if sr and sr.get("url"): status, final_url, matches, has_dis, sport_match = verify_content(sr["url"], naziv) time.sleep(RATE_SLEEP) fake_probe = {"lang": "en", "url": sr["url"], "status": status, "final_url": final_url, "matches": matches, "has_disambig": has_dis, "sport_match": sport_match} conf = score_confidence(fake_probe, naziv) conf = round(max(0.0, conf - 0.05), 2) log(f" WIKI-EN search title={sr.get('title')!r} status={status} matches={matches} conf={conf}") if conf > 0: stats["succ_search_en"] += 1 cand = {"url": final_url or sr["url"], "lang": "en-search", "confidence": conf, "razlog": f"Wikipedia EN opensearch '{sr.get('title')}', matches={matches}"} if best is None or cand["confidence"] > best["confidence"]: best = cand if best is None: stats["zero_match"] += 1 log(f" -> NO match") continue log(f" -> BEST url={best['url']} lang={best['lang']} conf={best['confidence']}") rec = { "id": rid, "naziv": naziv, "predlozeni_url": best["url"], "lang": best["lang"], "confidence": best["confidence"], "razlog": best["razlog"], } if best["confidence"] >= APPLY_THRESHOLD: stats["applied"] += 1 apply_rows.append(rec) else: stats["kandidati"] += 1 candidate_rows.append(rec) log(f"STATS: {stats}") # ---------- Write outputs ---------- # CSV (always) with open(KANDIDATI_CSV, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["id", "naziv", "predlozeni_url", "lang", "confidence", "razlog", "kategorija"]) for r in apply_rows: w.writerow([r["id"], r["naziv"], r["predlozeni_url"], r["lang"], r["confidence"], r["razlog"], "APPLY"]) for r in candidate_rows: w.writerow([r["id"], r["naziv"], r["predlozeni_url"], r["lang"], r["confidence"], r["razlog"], "KANDIDAT"]) log(f"Wrote CSV: {KANDIDATI_CSV} (apply={len(apply_rows)} kandidati={len(candidate_rows)})") # XLSX try: from openpyxl import Workbook wb = Workbook() ws = wb.active ws.title = "manifestacije_kandidati" ws.append(["id", "naziv", "predlozeni_url", "lang", "confidence", "razlog", "kategorija"]) for r in apply_rows: ws.append([r["id"], r["naziv"], r["predlozeni_url"], r["lang"], r["confidence"], r["razlog"], "APPLY"]) for r in candidate_rows: ws.append([r["id"], r["naziv"], r["predlozeni_url"], r["lang"], r["confidence"], r["razlog"], "KANDIDAT"]) wb.save(KANDIDATI_XLSX) log(f"Wrote XLSX: {KANDIDATI_XLSX}") except Exception as e: log(f"XLSX skipped: {e}") # SQL apply script (user can run after ALTER TABLE) with open(APPLY_SQL, "w", encoding="utf-8") as f: f.write("-- sub4_manifestacije_apply.sql v1.0 - 2026-05-05\n") f.write("-- Run as: psql -h $PG_HOST -p $PG_PORT -U $PG_USER -d $PG_DB -f sub4_manifestacije_apply.sql\n") f.write("-- Confidence threshold: >= 0.85 (Wikipedia HR/EN with content verification)\n\n") f.write("BEGIN;\n\n") f.write("-- Schema additions (idempotent)\n") f.write("ALTER TABLE pgz_sport.manifestacije ADD COLUMN IF NOT EXISTS web TEXT;\n") f.write("ALTER TABLE pgz_sport.manifestacije ADD COLUMN IF NOT EXISTS wiki_url TEXT;\n") f.write("ALTER TABLE pgz_sport.manifestacije ADD COLUMN IF NOT EXISTS enriched_at TIMESTAMPTZ;\n") f.write("ALTER TABLE pgz_sport.manifestacije ADD COLUMN IF NOT EXISTS enriched_confidence REAL;\n\n") for r in apply_rows: url = r["predlozeni_url"].replace("'", "''") naziv = r["naziv"].replace("'", "''") f.write(f"-- id={r['id']} {r['razlog']}\n") f.write( f"UPDATE pgz_sport.manifestacije " f"SET wiki_url='{url}', enriched_at=NOW(), enriched_confidence={r['confidence']} " f"WHERE id={r['id']} AND COALESCE(wiki_url,'')='';\n" ) f.write("\nCOMMIT;\n") log(f"Wrote SQL apply script: {APPLY_SQL} (rows: {len(apply_rows)})") # Try direct DB apply (will succeed only if columns exist) if has_cols and apply_rows: try: conn = db_connect() with conn.cursor() as cur: applied_db = 0 for r in apply_rows: cur.execute( "UPDATE pgz_sport.manifestacije " "SET wiki_url=%s, enriched_at=NOW(), enriched_confidence=%s " "WHERE id=%s AND COALESCE(wiki_url,'')=''", (r["predlozeni_url"], r["confidence"], r["id"]), ) applied_db += cur.rowcount conn.commit() log(f"DB apply: updated {applied_db} rows in pgz_sport.manifestacije") conn.close() except Exception as e: log(f"DB apply failed: {e}") else: log(f"DB apply skipped: has_cols={has_cols} apply_count={len(apply_rows)} (use SQL script)") summary_after = fetch_summary() log(f"AFTER: total={summary_after['total']} ima_web={summary_after['ima_web']} ima_wiki={summary_after['ima_wiki']} has_cols={summary_after['has_cols']}") # Stats JSON for MD generator out = { "before": summary_before, "after": summary_after, "stats": stats, "apply_rows": apply_rows, "candidate_rows": candidate_rows, "ts": datetime.now(timezone.utc).isoformat(), } with open(f"{AUDIT_DIR}/sub4_manifestacije_stats.json", "w", encoding="utf-8") as f: json.dump(out, f, ensure_ascii=False, indent=2) log("Wrote stats JSON") logf.close() return out if __name__ == "__main__": main()