Files
pgz-sport/scrapers/results_multilevel.py

195 lines
7.5 KiB
Python
Executable File

#!/usr/bin/env python3
from dotenv import load_dotenv
load_dotenv('/opt/rinet-gpu/.env.master')
# auto-added by patch_scrapers_with_dotenv.sh
"""results_multilevel.py — daily scrape of league match results across HR sport
federations (HNS / HKS / HRS).
This file is the **scaffold**: a real production-grade per-match scraper for
each federation is per-federation HTML/embed/API work. The structure here
makes adding each federation a single function — and lays down logging,
dedup-by-external-id, klub-name → klub_id resolution, and CLI flags.
Scope today (2026-05-09):
- HNS (nogomet): TODO — likely source https://hns-cff.hr/ + per-liga pages
- HKS (košarka): partial — could borrow Genius Sports embed used by
/opt/pgz-sport/scrapers/hks_scraper.py for standings; that scraper does
*standings*, not per-match results
- HRS (rukomet): TODO — likely source https://www.hrs.hr/
Exit 0 on partial coverage; per-federation failure logs to stderr.
"""
from __future__ import annotations
import argparse, json, logging, os, sys, time
from datetime import datetime, timezone, date
from typing import Optional
import psycopg2
import psycopg2.extras
logging.basicConfig(level=logging.INFO, format="%(asctime)s [results_ml] %(message)s")
log = logging.getLogger("results_ml")
DSN = os.environ.get(
"RINET_DSN",
f"host=127.0.0.1 port=6432 dbname=rinet_v3 user=rinet password={os.environ['DB_PASSWORD']}",
)
UA = "RiNET-Civic/1.0 (https://rinet.one) results scraper"
RATE_LIMIT_SEC = 1.2 # polite between requests within one federation
# ─── DB helpers ────────────────────────────────────────────────────────
def db_conn():
return psycopg2.connect(DSN)
def upsert_match(cur, row: dict) -> str:
"""Insert if (federation, external_id) is new; else update score+status.
Returns 'inserted' | 'updated' | 'skipped' | 'error'.
"""
if not row.get("federation") or not row.get("external_id"):
return "skipped"
try:
cur.execute(
"""
INSERT INTO pgz_sport.results
(sport, federation, liga, sezona, match_date, kolo,
home_team, away_team, home_klub_id, away_klub_id,
home_score, away_score, status, source_url, external_id, raw_payload)
VALUES (%(sport)s, %(federation)s, %(liga)s, %(sezona)s,
%(match_date)s, %(kolo)s,
%(home_team)s, %(away_team)s, %(home_klub_id)s, %(away_klub_id)s,
%(home_score)s, %(away_score)s,
COALESCE(%(status)s,'final'),
%(source_url)s, %(external_id)s,
%(raw_payload)s::jsonb)
ON CONFLICT (federation, external_id) DO UPDATE SET
home_score = EXCLUDED.home_score,
away_score = EXCLUDED.away_score,
status = EXCLUDED.status,
raw_payload= EXCLUDED.raw_payload,
scraped_at = now()
RETURNING (xmax = 0) AS inserted
""",
{**row,
"raw_payload": json.dumps(row.get("raw_payload") or {}, ensure_ascii=False)},
)
was_inserted = cur.fetchone()["inserted"]
return "inserted" if was_inserted else "updated"
except Exception as e:
log.warning(f"upsert_match err for {row.get('federation')}/{row.get('external_id')}: {e}")
return "error"
def resolve_klub(cur, name: str, sport: str) -> Optional[int]:
"""Best-effort name → klubovi.id. Falls back to None."""
if not name:
return None
cur.execute(
"""SELECT id FROM pgz_sport.klubovi
WHERE sport ILIKE %s AND naziv ILIKE %s
ORDER BY length(naziv) ASC LIMIT 1""",
(sport, f"%{name.strip()[:40]}%"),
)
r = cur.fetchone()
return r["id"] if r else None
# ─── Per-federation scrapers ────────────────────────────────────────────
def scrape_hns(cur, sezona: str = "2025/26") -> dict:
"""HNS (nogomet). TODO: walk per-liga pages on hns-cff.hr.
Plausible sources (verified in earlier ingests):
* https://hns-cff.hr/natjecanja/ (lige listing)
* https://semafor.hns.family/ (per-match drill-down — this is what
hns_master_harvester.py uses for player seasons)
Per-match parsing for SEZONA-by-SEZONA scrape is non-trivial because
each liga page is a different layout. Implementation deferred.
"""
log.warning("scrape_hns: NOT YET IMPLEMENTED — stub returns 0 rows")
return {"inserted": 0, "updated": 0, "errors": 0, "status": "stub"}
def scrape_hks(cur, sezona: str = "2025/26") -> dict:
"""HKS (košarka). Genius Sports embed exposes JSON for fixtures.
Existing /opt/pgz-sport/scrapers/hks_scraper.py reads the *standings*
embed at:
https://hosted.dcd.shared.geniussports.com/embednf/HKS/en/competition/{ID}/standings
The fixtures/results embed has a different path
(.../competition/{ID}/fixtures or .../matches). Per-match parsing
deferred.
"""
log.warning("scrape_hks: NOT YET IMPLEMENTED — stub returns 0 rows")
return {"inserted": 0, "updated": 0, "errors": 0, "status": "stub"}
def scrape_hrs(cur, sezona: str = "2025/26") -> dict:
"""HRS (rukomet). No existing scraper — source TBD.
Likely candidates:
* https://www.hrs.hr/ (federation site)
* Per-liga pages once located
"""
log.warning("scrape_hrs: NOT YET IMPLEMENTED — stub returns 0 rows")
return {"inserted": 0, "updated": 0, "errors": 0, "status": "stub"}
# ─── orchestration ─────────────────────────────────────────────────────
FEDS = {
"hns": ("nogomet", scrape_hns),
"hks": ("kosarka", scrape_hks),
"hrs": ("rukomet", scrape_hrs),
}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--feds", default="hns,hks,hrs",
help="comma list of federation keys to run")
ap.add_argument("--sezona", default="2025/26")
ap.add_argument("--dry-run", action="store_true",
help="run scrapers but do not COMMIT")
args = ap.parse_args()
started = datetime.now(timezone.utc)
summary = {"started": started.isoformat(), "sezona": args.sezona, "feds": {}}
conn = db_conn()
conn.autocommit = False
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
try:
for fed in [f.strip().lower() for f in args.feds.split(",") if f.strip()]:
if fed not in FEDS:
log.warning(f"unknown federation: {fed}")
continue
sport, fn = FEDS[fed]
log.info(f"scraping {fed} ({sport})")
t0 = time.time()
try:
res = fn(cur, sezona=args.sezona)
except Exception as e:
res = {"inserted": 0, "updated": 0, "errors": 1,
"status": f"crash:{type(e).__name__}: {str(e)[:200]}"}
res["elapsed_s"] = round(time.time() - t0, 1)
summary["feds"][fed] = res
if args.dry_run:
conn.rollback()
log.info("dry-run: rolled back")
else:
conn.commit()
finally:
conn.close()
summary["ended"] = datetime.now(timezone.utc).isoformat()
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main())