Files
pgz-sport/scripts/hns_watchdog.py
T

344 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
from dotenv import load_dotenv
load_dotenv('/opt/rinet-gpu/.env.master')
# auto-added by patch_scrapers_with_dotenv.sh
# ──────────────────────────────────────────────────────────────────────────────
# hns_watchdog.py — PGŽ Sport HNS pipeline watchdog (SUB7)
# Author : Damir Radulić <dradulic@outlook.com> / <damir@rinet.one>
# Date : 2026-05-05
# Version: 1.0.0
# Purpose: Periodically poll DB progress for the HNS scraping pipeline,
# detect stalls, restart fallen worker processes and send Telegram
# status updates every 30 minutes. Fires a special "ALL DONE" alert
# once the mission goal is reached.
#
# Modes : --once run a single check and exit (cron-friendly)
# --daemon loop forever, sleeping CHECK_INTERVAL_SEC between checks
#
# Goal : 59/59 PGŽ financirani klubovi sa hns_klub_id, ≥80% igrača s
# profile_complete=true (visina_cm IS NOT NULL), ≥1000 utakmica.
# ──────────────────────────────────────────────────────────────────────────────
import os
import sys
import time
import json
import argparse
import logging
import logging.handlers
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
import psycopg2
import requests
# ── Config ────────────────────────────────────────────────────────────────────
DSN = os.getenv(
"RINET_DSN",
f"host=10.10.0.2 port=6432 dbname=rinet_v3 user=rinet password={os.environ['DB_PASSWORD']}",
)
TG_TOKEN = os.getenv("TG_BOT_TOKEN", "8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y")
TG_CHAT = os.getenv("TG_CHAT", "7969491558")
LOG_DIR = Path("/var/log/pgz-sport-debug")
LOG_FILE = LOG_DIR / "hns_watchdog.log"
STATE_FILE = LOG_DIR / "hns_watchdog_state.json"
CHECK_INTERVAL_SEC = 30 * 60 # 30 min between daemon iterations
STALL_WINDOW_SEC = 30 * 60 # consider stale if no growth in 30 min
DONE_FLAG_FILE = LOG_DIR / "hns_watchdog_DONE.flag"
# Mission targets
TARGET_KLUBOVI = 59
TARGET_PROFILE_PCT = 0.80
TARGET_MATCHES = 1000
# Worker processes to keep alive (process_name : restart_command)
WORKERS = {
"hns_master_harvester": [
"python3", "/opt/pgz-sport/scripts/hns_master_harvester.py",
],
"hns_season_v3": [
"python3", "/opt/pgz-sport/scripts/hns_season_v3.py",
],
}
# ── Logging ───────────────────────────────────────────────────────────────────
LOG_DIR.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger("hns_watchdog")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.handlers.RotatingFileHandler(
LOG_FILE, maxBytes=5_000_000, backupCount=5
)
handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
))
logger.addHandler(handler)
logger.addHandler(logging.StreamHandler(sys.stdout))
# ── Helpers ───────────────────────────────────────────────────────────────────
PROGRESS_SQL = """
SELECT
(SELECT COUNT(*) FROM pgz_sport.klubovi
WHERE sport='nogomet' AND pgz_sufinanciran=true
AND hns_klub_id IS NOT NULL) AS klubovi_hns,
(SELECT COUNT(DISTINCT klub_id) FROM pgz_sport.hns_klub_roster) AS roster_klubovi,
(SELECT COUNT(*) FROM pgz_sport.clanovi
WHERE hns_igrac_id IS NOT NULL) AS igraci_hns,
(SELECT COUNT(*) FROM pgz_sport.clanovi
WHERE hns_igrac_id IS NOT NULL AND visina_cm IS NOT NULL) AS igraci_profil,
(SELECT COUNT(*) FROM pgz_sport.hns_player_seasons) AS seasons_rec,
(SELECT COUNT(*) FROM pgz_sport.hns_player_matches) AS matches_rec
;
"""
PENDING_SQL = """
SELECT COUNT(*) FROM pgz_sport.clanovi
WHERE hns_igrac_id IS NOT NULL
AND visina_cm IS NULL;
"""
def db_query():
"""Returns dict of progress counters (or None on failure)."""
try:
conn = psycopg2.connect(DSN, connect_timeout=10)
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(PROGRESS_SQL)
row = cur.fetchone()
cols = ["klubovi_hns", "roster_klubovi", "igraci_hns",
"igraci_profil", "seasons_rec", "matches_rec"]
counts = dict(zip(cols, row))
try:
cur.execute(PENDING_SQL)
counts["pending_players"] = cur.fetchone()[0]
except Exception as e:
logger.warning("PENDING_SQL failed: %s", e)
counts["pending_players"] = None
conn.close()
return counts
except Exception as e:
logger.error("DB query failed: %s", e)
return None
def telegram(text):
try:
r = requests.post(
f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage",
data={"chat_id": TG_CHAT, "text": text[:4000],
"parse_mode": "HTML", "disable_web_page_preview": "true"},
timeout=10,
)
ok = r.ok and r.json().get("ok", False)
if not ok:
logger.warning("Telegram returned: %s", r.text[:300])
return ok
except Exception as e:
logger.error("Telegram send failed: %s", e)
return False
def load_state():
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text())
except Exception:
pass
return {}
def save_state(state):
try:
STATE_FILE.write_text(json.dumps(state, indent=2, default=str))
except Exception as e:
logger.warning("Cannot persist state: %s", e)
def proc_alive(name):
"""True if a process matching `name` is currently running."""
try:
# pgrep -f returns 0 if at least one match
r = subprocess.run(
["pgrep", "-f", name],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
timeout=5,
)
return r.returncode == 0
except Exception as e:
logger.warning("pgrep failed for %s: %s", name, e)
return True # err on caution: do not respawn if uncertain
def restart_worker(name, cmd):
log_path = LOG_DIR / f"{name}_respawn_{datetime.now():%Y%m%d_%H%M}.log"
try:
with open(log_path, "ab") as logf:
subprocess.Popen(
cmd,
stdout=logf, stderr=subprocess.STDOUT,
cwd="/opt/pgz-sport/scripts",
start_new_session=True,
)
logger.info("Re-spawned worker %s -> %s", name, log_path)
return True
except Exception as e:
logger.error("Failed to respawn %s: %s", name, e)
return False
def check_workers():
"""Return list of worker names that were re-spawned."""
respawned = []
for name, cmd in WORKERS.items():
if not proc_alive(name):
if restart_worker(name, cmd):
respawned.append(name)
return respawned
def detect_stale(prev, curr):
"""True if seasons_rec did not grow even though there are pending players."""
if not prev or not curr:
return False
if curr.get("pending_players") in (None, 0):
return False
try:
ts_prev = datetime.fromisoformat(prev.get("ts"))
except Exception:
return False
if datetime.utcnow() - ts_prev < timedelta(seconds=STALL_WINDOW_SEC):
return False # not enough time elapsed
grew = (curr.get("seasons_rec", 0) > prev.get("seasons_rec", 0) or
curr.get("matches_rec", 0) > prev.get("matches_rec", 0) or
curr.get("igraci_profil", 0) > prev.get("igraci_profil", 0))
return not grew
def goal_reached(c):
if not c:
return False
if c["klubovi_hns"] < TARGET_KLUBOVI:
return False
if c["matches_rec"] < TARGET_MATCHES:
return False
if c["igraci_hns"] <= 0:
return False
pct = c["igraci_profil"] / c["igraci_hns"]
return pct >= TARGET_PROFILE_PCT
def fmt_status(c, respawned, stale, suffix=""):
if not c:
return f"<b>HNS watchdog</b>\nDB query failed at {datetime.utcnow():%Y-%m-%d %H:%M}Z"
pct = (c["igraci_profil"] / c["igraci_hns"] * 100) if c["igraci_hns"] else 0
body = (
f"<b>HNS watchdog</b> {datetime.utcnow():%Y-%m-%d %H:%MZ}\n"
f"Klubovi (HNS id): <b>{c['klubovi_hns']}/{TARGET_KLUBOVI}</b>\n"
f"Roster scraped: {c['roster_klubovi']}\n"
f"Igrači (HNS id): {c['igraci_hns']}\n"
f"Igrači s profilom: {c['igraci_profil']} ({pct:0.1f}%)\n"
f"Sezone: {c['seasons_rec']}\n"
f"Utakmice: <b>{c['matches_rec']}</b>/{TARGET_MATCHES}\n"
f"Pending igrači: {c.get('pending_players')}\n"
)
if respawned:
body += f"\nRe-spawned: {', '.join(respawned)}"
if stale:
body += "\nSTALE: nema rasta u zadnjih 30 min"
if suffix:
body += f"\n{suffix}"
return body
# ── Main check ────────────────────────────────────────────────────────────────
def run_check(send_telegram=True):
logger.info("=== watchdog cycle ===")
state = load_state()
prev = state.get("last_counts")
counts = db_query()
respawned = check_workers()
stale = detect_stale(prev, counts) if counts else False
done = goal_reached(counts)
msg = fmt_status(counts, respawned, stale)
notify = False
suffix = ""
if done and not DONE_FLAG_FILE.exists():
DONE_FLAG_FILE.write_text(datetime.utcnow().isoformat())
suffix = "\nALL DONE — mission target reached!"
msg = fmt_status(counts, respawned, stale, suffix=suffix)
notify = True
elif respawned or stale:
notify = True
else:
# routine 30-min heartbeat: send only if last notify >= 30 min ago
last_ts = state.get("last_notify_ts")
if not last_ts:
notify = True
else:
try:
last_dt = datetime.fromisoformat(last_ts)
if datetime.utcnow() - last_dt >= timedelta(minutes=29):
notify = True
except Exception:
notify = True
logger.info("counts=%s respawned=%s stale=%s notify=%s done=%s",
counts, respawned, stale, notify, done)
if send_telegram and notify:
if telegram(msg):
state["last_notify_ts"] = datetime.utcnow().isoformat()
else:
logger.warning("Telegram delivery failed")
if counts:
state["last_counts"] = {**counts, "ts": datetime.utcnow().isoformat()}
save_state(state)
return {"counts": counts, "respawned": respawned,
"stale": stale, "done": done, "notified": notify}
# ── Daemon loop ───────────────────────────────────────────────────────────────
def run_daemon():
logger.info("Starting watchdog daemon (interval=%ss)", CHECK_INTERVAL_SEC)
while True:
try:
run_check(send_telegram=True)
except Exception as e:
logger.exception("cycle crashed: %s", e)
time.sleep(CHECK_INTERVAL_SEC)
# ── Entry point ───────────────────────────────────────────────────────────────
def main():
p = argparse.ArgumentParser(description="HNS pipeline watchdog")
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--once", action="store_true",
help="Run a single check and exit (cron-friendly)")
g.add_argument("--daemon", action="store_true",
help="Run forever, sleeping 30 min between checks")
p.add_argument("--no-telegram", action="store_true",
help="Skip Telegram notifications (debug)")
args = p.parse_args()
if args.daemon:
run_daemon()
else:
result = run_check(send_telegram=not args.no_telegram)
# Print compact JSON for cron / shell usage
print(json.dumps(result, default=str, ensure_ascii=False))
if __name__ == "__main__":
main()