#!/usr/bin/env python3 # ────────────────────────────────────────────────────────────────────────────── # hns_watchdog.py — PGŽ Sport HNS pipeline watchdog (SUB7) # Author : Damir Radulić / # Date : 2026-05-05 # Version: 1.0.0 # Purpose: Periodically poll DB progress for the HNS scraping pipeline, # detect stalls, restart fallen worker processes and send Telegram # status updates every 30 minutes. Fires a special "ALL DONE" alert # once the mission goal is reached. # # Modes : --once run a single check and exit (cron-friendly) # --daemon loop forever, sleeping CHECK_INTERVAL_SEC between checks # # Goal : 59/59 PGŽ financirani klubovi sa hns_klub_id, ≥80% igrača s # profile_complete=true (visina_cm IS NOT NULL), ≥1000 utakmica. # ────────────────────────────────────────────────────────────────────────────── import os import sys import time import json import argparse import logging import logging.handlers import subprocess from datetime import datetime, timedelta from pathlib import Path import psycopg2 import requests # ── Config ──────────────────────────────────────────────────────────────────── DSN = os.getenv( "RINET_DSN", f"host=10.10.0.2 port=6432 dbname=rinet_v3 user=rinet password={os.environ['DB_PASSWORD']}", ) TG_TOKEN = os.getenv("TG_BOT_TOKEN", "8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y") TG_CHAT = os.getenv("TG_CHAT", "7969491558") LOG_DIR = Path("/var/log/pgz-sport-debug") LOG_FILE = LOG_DIR / "hns_watchdog.log" STATE_FILE = LOG_DIR / "hns_watchdog_state.json" CHECK_INTERVAL_SEC = 30 * 60 # 30 min between daemon iterations STALL_WINDOW_SEC = 30 * 60 # consider stale if no growth in 30 min DONE_FLAG_FILE = LOG_DIR / "hns_watchdog_DONE.flag" # Mission targets TARGET_KLUBOVI = 59 TARGET_PROFILE_PCT = 0.80 TARGET_MATCHES = 1000 # Worker processes to keep alive (process_name : restart_command) WORKERS = { "hns_master_harvester": [ "python3", "/opt/pgz-sport/scripts/hns_master_harvester.py", ], "hns_season_v3": [ "python3", "/opt/pgz-sport/scripts/hns_season_v3.py", ], } # ── Logging ─────────────────────────────────────────────────────────────────── LOG_DIR.mkdir(parents=True, exist_ok=True) logger = logging.getLogger("hns_watchdog") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.handlers.RotatingFileHandler( LOG_FILE, maxBytes=5_000_000, backupCount=5 ) handler.setFormatter(logging.Formatter( "%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%S", )) logger.addHandler(handler) logger.addHandler(logging.StreamHandler(sys.stdout)) # ── Helpers ─────────────────────────────────────────────────────────────────── PROGRESS_SQL = """ SELECT (SELECT COUNT(*) FROM pgz_sport.klubovi WHERE sport='nogomet' AND pgz_sufinanciran=true AND hns_klub_id IS NOT NULL) AS klubovi_hns, (SELECT COUNT(DISTINCT klub_id) FROM pgz_sport.hns_klub_roster) AS roster_klubovi, (SELECT COUNT(*) FROM pgz_sport.clanovi WHERE hns_igrac_id IS NOT NULL) AS igraci_hns, (SELECT COUNT(*) FROM pgz_sport.clanovi WHERE hns_igrac_id IS NOT NULL AND visina_cm IS NOT NULL) AS igraci_profil, (SELECT COUNT(*) FROM pgz_sport.hns_player_seasons) AS seasons_rec, (SELECT COUNT(*) FROM pgz_sport.hns_player_matches) AS matches_rec ; """ PENDING_SQL = """ SELECT COUNT(*) FROM pgz_sport.clanovi WHERE hns_igrac_id IS NOT NULL AND visina_cm IS NULL; """ def db_query(): """Returns dict of progress counters (or None on failure).""" try: conn = psycopg2.connect(DSN, connect_timeout=10) conn.autocommit = True with conn.cursor() as cur: cur.execute(PROGRESS_SQL) row = cur.fetchone() cols = ["klubovi_hns", "roster_klubovi", "igraci_hns", "igraci_profil", "seasons_rec", "matches_rec"] counts = dict(zip(cols, row)) try: cur.execute(PENDING_SQL) counts["pending_players"] = cur.fetchone()[0] except Exception as e: logger.warning("PENDING_SQL failed: %s", e) counts["pending_players"] = None conn.close() return counts except Exception as e: logger.error("DB query failed: %s", e) return None def telegram(text): try: r = requests.post( f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage", data={"chat_id": TG_CHAT, "text": text[:4000], "parse_mode": "HTML", "disable_web_page_preview": "true"}, timeout=10, ) ok = r.ok and r.json().get("ok", False) if not ok: logger.warning("Telegram returned: %s", r.text[:300]) return ok except Exception as e: logger.error("Telegram send failed: %s", e) return False def load_state(): if STATE_FILE.exists(): try: return json.loads(STATE_FILE.read_text()) except Exception: pass return {} def save_state(state): try: STATE_FILE.write_text(json.dumps(state, indent=2, default=str)) except Exception as e: logger.warning("Cannot persist state: %s", e) def proc_alive(name): """True if a process matching `name` is currently running.""" try: # pgrep -f returns 0 if at least one match r = subprocess.run( ["pgrep", "-f", name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=5, ) return r.returncode == 0 except Exception as e: logger.warning("pgrep failed for %s: %s", name, e) return True # err on caution: do not respawn if uncertain def restart_worker(name, cmd): log_path = LOG_DIR / f"{name}_respawn_{datetime.now():%Y%m%d_%H%M}.log" try: with open(log_path, "ab") as logf: subprocess.Popen( cmd, stdout=logf, stderr=subprocess.STDOUT, cwd="/opt/pgz-sport/scripts", start_new_session=True, ) logger.info("Re-spawned worker %s -> %s", name, log_path) return True except Exception as e: logger.error("Failed to respawn %s: %s", name, e) return False def check_workers(): """Return list of worker names that were re-spawned.""" respawned = [] for name, cmd in WORKERS.items(): if not proc_alive(name): if restart_worker(name, cmd): respawned.append(name) return respawned def detect_stale(prev, curr): """True if seasons_rec did not grow even though there are pending players.""" if not prev or not curr: return False if curr.get("pending_players") in (None, 0): return False try: ts_prev = datetime.fromisoformat(prev.get("ts")) except Exception: return False if datetime.utcnow() - ts_prev < timedelta(seconds=STALL_WINDOW_SEC): return False # not enough time elapsed grew = (curr.get("seasons_rec", 0) > prev.get("seasons_rec", 0) or curr.get("matches_rec", 0) > prev.get("matches_rec", 0) or curr.get("igraci_profil", 0) > prev.get("igraci_profil", 0)) return not grew def goal_reached(c): if not c: return False if c["klubovi_hns"] < TARGET_KLUBOVI: return False if c["matches_rec"] < TARGET_MATCHES: return False if c["igraci_hns"] <= 0: return False pct = c["igraci_profil"] / c["igraci_hns"] return pct >= TARGET_PROFILE_PCT def fmt_status(c, respawned, stale, suffix=""): if not c: return f"HNS watchdog\nDB query failed at {datetime.utcnow():%Y-%m-%d %H:%M}Z" pct = (c["igraci_profil"] / c["igraci_hns"] * 100) if c["igraci_hns"] else 0 body = ( f"HNS watchdog {datetime.utcnow():%Y-%m-%d %H:%MZ}\n" f"Klubovi (HNS id): {c['klubovi_hns']}/{TARGET_KLUBOVI}\n" f"Roster scraped: {c['roster_klubovi']}\n" f"Igrači (HNS id): {c['igraci_hns']}\n" f"Igrači s profilom: {c['igraci_profil']} ({pct:0.1f}%)\n" f"Sezone: {c['seasons_rec']}\n" f"Utakmice: {c['matches_rec']}/{TARGET_MATCHES}\n" f"Pending igrači: {c.get('pending_players')}\n" ) if respawned: body += f"\nRe-spawned: {', '.join(respawned)}" if stale: body += "\nSTALE: nema rasta u zadnjih 30 min" if suffix: body += f"\n{suffix}" return body # ── Main check ──────────────────────────────────────────────────────────────── def run_check(send_telegram=True): logger.info("=== watchdog cycle ===") state = load_state() prev = state.get("last_counts") counts = db_query() respawned = check_workers() stale = detect_stale(prev, counts) if counts else False done = goal_reached(counts) msg = fmt_status(counts, respawned, stale) notify = False suffix = "" if done and not DONE_FLAG_FILE.exists(): DONE_FLAG_FILE.write_text(datetime.utcnow().isoformat()) suffix = "\nALL DONE — mission target reached!" msg = fmt_status(counts, respawned, stale, suffix=suffix) notify = True elif respawned or stale: notify = True else: # routine 30-min heartbeat: send only if last notify >= 30 min ago last_ts = state.get("last_notify_ts") if not last_ts: notify = True else: try: last_dt = datetime.fromisoformat(last_ts) if datetime.utcnow() - last_dt >= timedelta(minutes=29): notify = True except Exception: notify = True logger.info("counts=%s respawned=%s stale=%s notify=%s done=%s", counts, respawned, stale, notify, done) if send_telegram and notify: if telegram(msg): state["last_notify_ts"] = datetime.utcnow().isoformat() else: logger.warning("Telegram delivery failed") if counts: state["last_counts"] = {**counts, "ts": datetime.utcnow().isoformat()} save_state(state) return {"counts": counts, "respawned": respawned, "stale": stale, "done": done, "notified": notify} # ── Daemon loop ─────────────────────────────────────────────────────────────── def run_daemon(): logger.info("Starting watchdog daemon (interval=%ss)", CHECK_INTERVAL_SEC) while True: try: run_check(send_telegram=True) except Exception as e: logger.exception("cycle crashed: %s", e) time.sleep(CHECK_INTERVAL_SEC) # ── Entry point ─────────────────────────────────────────────────────────────── def main(): p = argparse.ArgumentParser(description="HNS pipeline watchdog") g = p.add_mutually_exclusive_group(required=True) g.add_argument("--once", action="store_true", help="Run a single check and exit (cron-friendly)") g.add_argument("--daemon", action="store_true", help="Run forever, sleeping 30 min between checks") p.add_argument("--no-telegram", action="store_true", help="Skip Telegram notifications (debug)") args = p.parse_args() if args.daemon: run_daemon() else: result = run_check(send_telegram=not args.no_telegram) # Print compact JSON for cron / shell usage print(json.dumps(result, default=str, ensure_ascii=False)) if __name__ == "__main__": main()