Files
pgz-sport/scripts/hns_player_deep.py

538 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from dotenv import load_dotenv
load_dotenv('/opt/rinet-gpu/.env.master')
# auto-added by patch_scrapers_with_dotenv.sh
# -*- coding: utf-8 -*-
"""
hns_player_deep.py — SUB3 deep HNS player scraper
─────────────────────────────────────────────────
Author: dradulic@outlook.com / damir@rinet.one
Date: 2026-05-05
Version: 1.0
Scrapes semafor.hns.family/igraci/{id}/{slug}/ for every clanovi.hns_igrac_id row,
extracting:
• profil meta (datum_rodenja, mjesto_rodenja, broj_dresa, current klub)
• per-season stats per natjecanje (UPSERT pgz_sport.hns_player_seasons)
• last 30+ matches (UPSERT pgz_sport.hns_player_matches)
Server-rendered HTML — no Playwright needed → uses requests for 510× speedup.
Fallback to Playwright if --use-playwright is passed.
Resume-able: skips clanovi where last_scraped_at > now() - interval N days.
Usage:
python3 hns_player_deep.py [--limit 200] [--days 7] [--player HNS_ID] [--use-playwright]
"""
import os, sys, re, time, json, argparse, traceback
from datetime import datetime, date
from urllib.parse import urljoin
import requests
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
DSN = os.getenv("RINET_DSN",
f"host=10.10.0.2 port=6432 dbname=rinet_v3 user=rinet password={os.environ['DB_PASSWORD']}")
TG_TOKEN = os.getenv("TG_BOT_TOKEN", "8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y")
TG_CHAT = os.getenv("TG_CHAT", "7969491558")
SLEEP = float(os.getenv("SLEEP", "0.8"))
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
LOG_DIR = "/var/log/pgz-sport-debug"
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = f"{LOG_DIR}/sub3_{datetime.now().strftime('%Y%m%d_%H%M')}.log"
LOG_FH = open(LOG_FILE, "a", encoding="utf-8")
def log(msg: str, telegram: bool = False) -> None:
line = f"[{datetime.now().isoformat(timespec='seconds')}] {msg}"
print(line, flush=True)
LOG_FH.write(line + "\n"); LOG_FH.flush()
if telegram and TG_TOKEN and TG_CHAT:
try:
requests.post(
f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage",
data={"chat_id": TG_CHAT, "text": msg[:4000]},
timeout=8,
)
except Exception:
pass
# ── HTTP session ──────────────────────────────────────────────────────────
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": UA, "Accept-Language": "hr,en;q=0.7"})
def fetch_html(url: str, timeout: int = 20) -> str | None:
try:
r = SESSION.get(url, timeout=timeout)
if r.status_code != 200:
log(f" HTTP {r.status_code} {url}")
return None
return r.text
except Exception as e:
log(f" fetch fail {url}: {e}")
return None
# ── Parsers ───────────────────────────────────────────────────────────────
def _strip_html(s: str) -> str:
s = re.sub(r"<[^>]+>", " ", s)
return re.sub(r"\s+", " ", s).strip()
def parse_profile(html: str) -> dict:
"""Extract player profile meta (HNS exposes only birth date / city / jersey / current club)."""
out = {
"broj_dresa": None,
"datum_rodenja": None,
"mjesto_rodenja": None,
"klub_hns_id": None,
"klub_naziv": None,
}
# playerHeader block (everything from header to first <!--)
m = re.search(r'<div class="block playerHeader"[^>]*>(.*?)<!--', html, re.DOTALL)
header_html = m.group(1) if m else html
# Jersey number
m = re.search(r'<span class="number"[^>]*>(\d+)</span>', header_html)
if not m:
# fallback: number in playerHeader text region (first standalone digit before name)
text = _strip_html(header_html)
mm = re.match(r'^\s*(\d{1,2})\s+[A-ZČĆŠŽĐ]', text)
if mm:
out["broj_dresa"] = int(mm.group(1))
else:
out["broj_dresa"] = int(m.group(1))
# Trenutni klub (first /klubovi/ link in header)
m = re.search(r'<a[^>]+href="/klubovi/(\d+)/([\w-]+)/?"[^>]*>([^<]+)<', header_html)
if m:
out["klub_hns_id"] = m.group(1)
out["klub_naziv"] = m.group(3).strip()
# Datum rođenja (dd.mm.yyyy.)
m = re.search(r'(\d{1,2})\.(\d{1,2})\.(\d{4})\.\s*(?:</[^>]+>\s*)?(?:<[^>]+>\s*)*\(?\s*\d+\s*godin', header_html)
if not m:
# Looser pattern in playerData
m = re.search(r'<div[^>]*class="[^"]*birth[^"]*"[^>]*>(\d{1,2})\.(\d{1,2})\.(\d{4})', header_html)
if not m:
# Fallback: any dd.mm.yyyy. near "Datum rođenja"
text = _strip_html(header_html)
mm = re.search(r'(\d{1,2})\.(\d{1,2})\.(\d{4})\.\s*\(?\s*\d+\s*godin[ae]?\)?\s*Datum rođenja', text)
if mm:
m = mm
if m:
try:
out["datum_rodenja"] = date(int(m.group(3)), int(m.group(2)), int(m.group(1)))
except Exception:
pass
# Mjesto rođenja: text right before "Mjesto rođenja"
text_all = _strip_html(header_html)
mm = re.search(r'([A-ZČĆŠŽĐ][\w\sčćšžđČĆŠŽĐ\-]{1,80}?)\s+Mjesto rođenja', text_all)
if mm:
out["mjesto_rodenja"] = mm.group(1).strip()
return out
# Each season block: "{YYYY/YY} Statistika Utakmice ... <playerCompetitionStatsTable> ... <matchlist>"
# We split player_profile_matches by the recurring pattern.
SEASON_HEADER_RE = re.compile(
r'(?:<[^>]+>\s*)?(20\d{2}/\d{2})(?:\s*<[^>]+>)?\s*Statistika\s+Utakmice',
re.IGNORECASE,
)
def parse_seasons_and_matches(html: str) -> tuple[list[dict], list[dict]]:
"""Return (season_rows, match_rows) for ALL seasons on the profile page."""
# Limit to player_profile_matches block to avoid noise
m = re.search(
r'<div class="block w1280 matchlist style1 player_profile_matches"[^>]*>(.*?)(?=<!--|<footer)',
html, re.DOTALL,
)
if not m:
return [], []
block = m.group(1)
# Find season header positions: <h2 class="seasonTitle ...">YYYY/YY</h2>
headers = list(re.finditer(
r'<h2\s+class="seasonTitle[^"]*"[^>]*>\s*(20\d{2}/\d{2})\s*</h2>',
block,
))
if not headers:
# Fallback: any <h2> with season label
headers = list(re.finditer(r'<h2[^>]*>\s*(20\d{2}/\d{2})\s*</h2>', block))
if not headers:
plain = re.sub(r'<[^>]+>', ' ', block)
plain = re.sub(r'\s+', ' ', plain)
return _parse_plain(plain)
sections = []
for i, h in enumerate(headers):
sezona = h.group(1)
start = h.start()
end = headers[i + 1].start() if i + 1 < len(headers) else len(block)
sections.append((sezona, block[start:end]))
season_rows: list[dict] = []
match_rows: list[dict] = []
for sezona, sec in sections:
# ── Per-season per-natjecanje stats (playerCompetitionStatsTable) ──
cs = re.search(
r'<div class="block w1280 playerCompetitionStatsTable"[^>]*>(.*?)</div>\s*</div>\s*</div>',
sec, re.DOTALL,
)
if cs:
stab = cs.group(1)
# Header row → identify columns; body rows have natjecanje + 6 ints
# Extract: total row "Ukupno" + per-competition rows
# Each row appears as <td>…</td>. Use table-agnostic approach: find every block of
# "<td>NATJECANJE</td><td>N</td><td>S</td><td>Z</td><td>G</td><td>YEL</td><td>RED</td>"
# but tables here use divs not td. Walk plain text per line.
stext = _strip_html(stab)
# Split by competition-row pattern: "<label> <int> <int> <int> <int> <int> <int>"
for rm in re.finditer(
r'([A-ZČĆŠŽĐa-zčćšžđ0-9][^|]*?)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)(?=\s|$)',
stext,
):
label = rm.group(1).strip()
if label.lower().startswith("ukupno"):
continue # we keep per-natjecanje rows only (UNIQUE prefers natjecanje)
if "Nastupi" in label or "Započeo" in label or "Statistika" in label:
continue
try:
season_rows.append({
"sezona": sezona,
"natjecanje": label[:200],
"nastupi": int(rm.group(2)),
"startna": int(rm.group(3)),
"zamjena": int(rm.group(4)),
"golovi": int(rm.group(5)),
"zuti": int(rm.group(6)),
"crveni": int(rm.group(7)),
})
except Exception:
pass
# ── Matches (matchlist style2) ──
ml = re.search(
r'<div class="matchlist style2 semafor player[^"]*"[^>]*>(.*?)</ul>',
sec, re.DOTALL,
)
if ml:
list_html = ml.group(1)
for row in re.finditer(
r'<li class="row[^"]*"[^>]*data-match="(\d+)"[^>]*>(.*?)</li>',
list_html, re.DOTALL,
):
row_html = row.group(2)
# Date
d = re.search(r'<div class="date">([^<]+)</div>', row_html)
# club1 / club2
c1 = re.search(r'<div class="club1"[^>]*>\s*<a[^>]*>([^<]+?)<', row_html)
c2 = re.search(r'<div class="club2"[^>]*>\s*<a[^>]*>([^<]+?)<', row_html)
# result
r1 = re.search(r'<div class="res1">(\d+)</div>', row_html)
r2 = re.search(r'<div class="res2">(\d+)</div>', row_html)
# natjecanje
cr = re.search(r'<div class="competitionround">([^<]+)</div>', row_html)
# goals
gl = re.search(r'<div class="goals">(\d+)</div>', row_html)
# cards "Y / R"
ca = re.search(r'<div class="cards">.*?(\d+)\s*/\s*(\d+).*?</div>', row_html, re.DOTALL)
# minutes
mn = re.search(r'<div class="minutes">(\d+)</div>', row_html)
# Parse date dd.mm.yyyy. HH:MM
datum = None
if d:
dm = re.search(r'(\d{1,2})\.(\d{1,2})\.(\d{4})', d.group(1))
if dm:
try:
datum = date(int(dm.group(3)), int(dm.group(2)), int(dm.group(1)))
except Exception:
pass
rezultat = f"{r1.group(1)}:{r2.group(1)}" if r1 and r2 else None
match_rows.append({
"datum": datum,
"domacin": (c1.group(1).strip() if c1 else "")[:120],
"gost": (c2.group(1).strip() if c2 else "")[:120],
"rezultat": rezultat,
"natjecanje": (cr.group(1).strip() if cr else "")[:200],
"golovi": int(gl.group(1)) if gl else 0,
"zuti": int(ca.group(1)) if ca else 0,
"crveni": int(ca.group(2)) if ca else 0,
"minute_do": int(mn.group(1)) if mn else None,
})
return season_rows, match_rows
def _parse_plain(plain_text: str) -> tuple[list[dict], list[dict]]:
"""Fallback: parse from already-stripped plain text (no match-row HTML access)."""
# Best effort: extract season totals only
season_rows: list[dict] = []
# Split by season headers
parts = re.split(r'(20\d{2}/\d{2})\s+Statistika\s+Utakmice', plain_text)
# parts: [pre, season1, body1, season2, body2, ...]
for i in range(1, len(parts), 2):
sezona = parts[i]
body = parts[i + 1] if i + 1 < len(parts) else ""
# Find the "Ukupno N N N G Y R" then per-competition lines
for rm in re.finditer(
r'([A-ZČĆŠŽĐa-zčćšžđ0-9][^|]*?)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)(?=\s|$)',
body[:3000],
):
label = rm.group(1).strip()
if label.lower().startswith("ukupno"):
continue
if "Nastupi" in label or "Statistika" in label:
continue
season_rows.append({
"sezona": sezona,
"natjecanje": label[:200],
"nastupi": int(rm.group(2)),
"startna": int(rm.group(3)),
"zamjena": int(rm.group(4)),
"golovi": int(rm.group(5)),
"zuti": int(rm.group(6)),
"crveni": int(rm.group(7)),
})
return season_rows, []
# ── DB ────────────────────────────────────────────────────────────────────
def db_conn():
c = psycopg2.connect(DSN); c.autocommit = True; return c
def get_targets(conn, limit: int, days: int, force_player: str | None = None) -> list[dict]:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if force_player:
cur.execute("""
SELECT id, hns_igrac_id, ime, prezime, source_url, slug
FROM pgz_sport.clanovi
WHERE hns_igrac_id = %s
LIMIT 1
""", (force_player,))
else:
cur.execute("""
SELECT id, hns_igrac_id, ime, prezime, source_url, slug
FROM pgz_sport.clanovi
WHERE hns_igrac_id IS NOT NULL
AND (last_scraped_at IS NULL OR last_scraped_at < now() - %s::interval)
ORDER BY (last_scraped_at IS NULL) DESC, id ASC
LIMIT %s
""", (f"{days} days", limit))
return cur.fetchall()
def update_clan(conn, clan_id: int, profile: dict, url: str) -> None:
sets, vals = [], []
if profile.get("datum_rodenja"):
sets.append("datum_rodenja = COALESCE(datum_rodenja, %s)")
vals.append(profile["datum_rodenja"])
sets.append("datum_rodjenja = COALESCE(datum_rodjenja, %s)")
vals.append(profile["datum_rodenja"])
if profile.get("mjesto_rodenja"):
sets.append("mjesto_rodenja = COALESCE(NULLIF(mjesto_rodenja,''), %s)")
vals.append(profile["mjesto_rodenja"])
sets.append("mjesto_rodjenja = COALESCE(NULLIF(mjesto_rodjenja,''), %s)")
vals.append(profile["mjesto_rodenja"])
if profile.get("broj_dresa") is not None:
sets.append("broj_dresa = COALESCE(broj_dresa, %s)")
vals.append(profile["broj_dresa"])
sets.append("source_url = %s"); vals.append(url)
sets.append("source = COALESCE(NULLIF(source,''), 'hns_semafor')")
sets.append("sport = COALESCE(NULLIF(sport,''), 'nogomet')")
sets.append("last_scraped_at = now()")
sets.append("source_synced_at = now()")
vals.append(clan_id)
sql = f"UPDATE pgz_sport.clanovi SET {', '.join(sets)} WHERE id = %s"
with conn.cursor() as cur:
cur.execute(sql, tuple(vals))
def upsert_seasons(conn, hns_id: str, clan_id: int, url: str, rows: list[dict]) -> int:
if not rows:
return 0
raw = [
(hns_id, clan_id, r["sezona"], None, None, r["natjecanje"][:200],
r.get("nastupi", 0), r.get("startna", 0), r.get("zamjena", 0),
r.get("golovi", 0), 0, r.get("zuti", 0), r.get("crveni", 0), 0, url)
for r in rows
]
# Dedupe by UNIQUE (hns_igrac_id, sezona, klub_hns_id, natjecanje)
dedup: dict[tuple, tuple] = {}
for row in raw:
k = (row[0], row[2], row[3], row[5])
dedup[k] = row
data = list(dedup.values())
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO pgz_sport.hns_player_seasons
(hns_igrac_id, clan_id, sezona, klub_hns_id, klub_naziv, natjecanje,
nastupi, startna, zamjena, golovi, asistencije, zuti, crveni, minute, source_url)
VALUES %s
ON CONFLICT (hns_igrac_id, sezona, klub_hns_id, natjecanje) DO UPDATE SET
nastupi = EXCLUDED.nastupi,
startna = EXCLUDED.startna,
zamjena = EXCLUDED.zamjena,
golovi = EXCLUDED.golovi,
zuti = EXCLUDED.zuti,
crveni = EXCLUDED.crveni,
source_url = EXCLUDED.source_url,
scraped_at = now()
""", data)
return len(rows)
def upsert_matches(conn, hns_id: str, clan_id: int, url: str, rows: list[dict]) -> int:
if not rows:
return 0
raw = [
(hns_id, clan_id, r["datum"], r["natjecanje"], r["domacin"], r["gost"],
r["rezultat"], None, None, None, r.get("minute_do"),
r.get("golovi", 0), 0, r.get("zuti", 0), r.get("crveni", 0), url)
for r in rows if r["datum"] and r["domacin"] and r["gost"]
]
# Dedupe by UNIQUE key (hns_igrac_id, datum, domacin, gost) — keep last occurrence
dedup: dict[tuple, tuple] = {}
for row in raw:
k = (row[0], row[2], row[4], row[5])
dedup[k] = row
data = list(dedup.values())
if not data:
return 0
with conn.cursor() as cur:
execute_values(cur, """
INSERT INTO pgz_sport.hns_player_matches
(hns_igrac_id, clan_id, datum, natjecanje, domacin, gost,
rezultat, pozicija, startna, minute_od, minute_do,
golovi, asistencije, zuti, crveni, source_url)
VALUES %s
ON CONFLICT (hns_igrac_id, datum, domacin, gost) DO UPDATE SET
rezultat = EXCLUDED.rezultat,
natjecanje = EXCLUDED.natjecanje,
minute_do = EXCLUDED.minute_do,
golovi = EXCLUDED.golovi,
zuti = EXCLUDED.zuti,
crveni = EXCLUDED.crveni,
source_url = EXCLUDED.source_url,
scraped_at = now()
""", data)
return len(data)
# ── Slug helper ───────────────────────────────────────────────────────────
def slugify(text: str) -> str:
if not text:
return ""
repl = str.maketrans("čćžšđČĆŽŠĐ", "ccczsdcczsd"[:10])
t = text.lower().translate(repl)
t = re.sub(r"[^a-z0-9\s-]", "", t)
return re.sub(r"\s+", "-", t).strip("-")
def build_url(t: dict) -> str:
if t.get("source_url") and "semafor.hns.family/igraci/" in t["source_url"]:
return t["source_url"]
slug = (t.get("slug") or slugify(f"{t['ime']} {t['prezime']}")) or "x"
return f"https://semafor.hns.family/igraci/{t['hns_igrac_id']}/{slug}/"
# ── Driver ────────────────────────────────────────────────────────────────
def process_one(conn, t: dict) -> dict:
url = build_url(t)
html = fetch_html(url)
if not html or "playerHeader" not in html:
log(f" ✗ no playerHeader for {t['ime']} {t['prezime']} ({t['hns_igrac_id']}) → {url}")
# Mark as scraped to avoid hot-loop on broken URL
with conn.cursor() as cur:
cur.execute(
"UPDATE pgz_sport.clanovi SET last_scraped_at = now() WHERE id = %s",
(t["id"],),
)
return {"profile": False, "seasons": 0, "matches": 0, "fields": 0}
profile = parse_profile(html)
seasons, matches = parse_seasons_and_matches(html)
# Update clan profile
update_clan(conn, t["id"], profile, url)
n_fields = sum(1 for k in ("datum_rodenja", "mjesto_rodenja", "broj_dresa") if profile.get(k))
n_s = upsert_seasons(conn, t["hns_igrac_id"], t["id"], url, seasons)
n_m = upsert_matches(conn, t["hns_igrac_id"], t["id"], url, matches)
return {"profile": True, "seasons": n_s, "matches": n_m, "fields": n_fields}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--limit", type=int, default=200)
ap.add_argument("--days", type=int, default=7)
ap.add_argument("--player", help="Single HNS ID (debug)")
ap.add_argument("--missing-matches", action="store_true",
help="Only target clanovi without rows in hns_player_matches")
ap.add_argument("--no-telegram", action="store_true")
args = ap.parse_args()
log(f"SUB3 deep scraper start | limit={args.limit} | days={args.days} | "
f"missing_matches={args.missing_matches} | log={LOG_FILE}",
telegram=not args.no_telegram)
conn = db_conn()
if args.missing_matches:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, hns_igrac_id, ime, prezime, source_url, slug
FROM pgz_sport.clanovi
WHERE hns_igrac_id IS NOT NULL
AND id NOT IN (
SELECT clan_id FROM pgz_sport.hns_player_matches WHERE clan_id IS NOT NULL
)
ORDER BY id ASC
LIMIT %s
""", (args.limit,))
targets = cur.fetchall()
else:
targets = get_targets(conn, args.limit, args.days, args.player)
log(f"Targets: {len(targets)}")
stats = {"scraped": 0, "seasons": 0, "matches": 0, "fields": 0, "errors": 0}
t0 = time.time()
for i, t in enumerate(targets, 1):
try:
r = process_one(conn, t)
stats["scraped"] += 1
stats["seasons"] += r["seasons"]
stats["matches"] += r["matches"]
stats["fields"] += r["fields"]
if i % 10 == 0 or r["matches"] > 0:
log(f" [{i}/{len(targets)}] {t['ime']} {t['prezime']} "
f"→ seasons +{r['seasons']} matches +{r['matches']} fields +{r['fields']} "
f"(totals: s={stats['seasons']} m={stats['matches']})")
except Exception as e:
stats["errors"] += 1
log(f" ✗ ERROR {t['ime']} {t['prezime']} ({t['hns_igrac_id']}): {e}")
log(traceback.format_exc()[:500])
time.sleep(SLEEP)
dur = time.time() - t0
summary = (
f"SUB3 done in {dur:.0f}s | scraped={stats['scraped']} "
f"seasons +{stats['seasons']} matches +{stats['matches']} "
f"fields +{stats['fields']} errors={stats['errors']}"
)
log(summary, telegram=not args.no_telegram)
# Result file
res_path = "/opt/pgz-sport/cc_tasks/SUB3_RESULT.md"
with open(res_path, "a", encoding="utf-8") as f:
f.write(f"\n## Run {datetime.now().isoformat(timespec='seconds')}\n")
f.write(f"- batch_limit: {args.limit}\n")
f.write(f"- targets: {len(targets)}\n")
f.write(f"- scraped: {stats['scraped']}\n")
f.write(f"- seasons +{stats['seasons']}\n")
f.write(f"- matches +{stats['matches']}\n")
f.write(f"- profile fields enriched: +{stats['fields']}\n")
f.write(f"- errors: {stats['errors']}\n")
f.write(f"- duration: {dur:.0f}s\n")
f.write(f"- log: {LOG_FILE}\n")
return 0
if __name__ == "__main__":
sys.exit(main())