Files
pgz-sport/scripts/godisnjak_pipeline.py.tg_replace.1777923681
T

312 lines
11 KiB
Python

#!/usr/bin/env python3
# ═══════════════════════════════════════════════════════════════════
# Fajl: godisnjak_pipeline.py
# Verzija: 1.0.0
# Datum: 03.05.2026
# Autor: Damir Radulić <dradulic@outlook.com>
# Lokacija: /opt/pgz-sport/scripts/godisnjak_pipeline.py
# Svrha: Embed godisnjaci PGZ u pgz_universe + LLM ekstrakcija osoba/uloga
# Zavisi od: qdrant_client, httpx, psycopg2, rapidfuzz
# Utječe na: pgz_universe (Qdrant), pgz_sport.clanovi (insert)
# ═══════════════════════════════════════════════════════════════════
"""Godisnjak PGZ embed + LLM person extraction pipeline."""
import asyncio
import glob
import hashlib
import json
import logging
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor
import httpx
import psycopg2
from psycopg2.extras import execute_batch
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [godisnjak] %(levelname)s: %(message)s",
handlers=[
logging.FileHandler("/opt/pgz-sport/logs/godisnjak_pipeline.log"),
logging.StreamHandler(),
],
)
log = logging.getLogger("godisnjak")
DSN = "host=10.10.0.2 port=6432 dbname=rinet_v3 user=rinet password=R1net2026!SecureDB#v7"
EMBED_URL = "http://localhost:9879/api/embeddings"
VLLM_URL = "http://localhost:8001/v1/chat/completions"
VLLM_MODEL = "Qwen/Qwen2.5-7B-Instruct-AWQ"
QDRANT_COLLECTION = "pgz_universe"
DATA_DIR = "/opt/pgz-sport/_data/godisnjaci"
MAX_WORKERS = 5
CHUNK_SIZE = 1500 # < 2000 zbog BGE-M3 truncation
EXTRACT_PROMPT = """Ekstrahiraj iz teksta SVA imena osoba i njihove uloge.
Format strogo JSON:
{"osobe": [{"ime":"X","prezime":"Y","klub":"Z","uloga":"predsjednik|igrac|trener|tajnik|fizioterapeut|lijecnik","godina_rodenja":1990}]}
Uloge ISKLJUCIVO: predsjednik, dopredsjednik, tajnik, blagajnik, clan_uprave, igrac, sportas, glavni_trener, trener, pomocni_trener, kondicioni_trener, selektor, izbornik, team_manager, voditelj, lijecnik, fizioterapeut, kineziolog, maser, sudac, volonter
Pravila:
1. Samo HRVATSKE osobe (ne strani sportasi koji su gostovali)
2. Ako klub nije jasan -> ostavi prazan string
3. NE izmisljaj imena -> samo ona JASNO IZRAZENA u tekstu
4. Vrati VALID JSON bez markdown backtick-ova"""
def chunk_text(text, size=CHUNK_SIZE):
paragraphs = re.split(r"\n\n+", text)
chunks, cur = [], ""
for p in paragraphs:
if len(cur) + len(p) > size:
if cur:
chunks.append(cur.strip())
cur = p
else:
cur += "\n\n" + p
if cur:
chunks.append(cur.strip())
return [c for c in chunks if len(c) > 100]
async def embed_batch(texts):
async with httpx.AsyncClient(timeout=120.0) as c:
r = await c.post(EMBED_URL, json={"texts": texts})
d = r.json()
return d.get("embeddings", [])
async def extract_persons(chunk_text_str):
async with httpx.AsyncClient(timeout=120.0) as c:
r = await c.post(
VLLM_URL,
json={
"model": VLLM_MODEL,
"messages": [
{"role": "system", "content": EXTRACT_PROMPT},
{"role": "user", "content": chunk_text_str[:5500]},
],
"temperature": 0.1,
"max_tokens": 3000,
"response_format": {"type": "json_object"},
},
)
d = r.json()
try:
content = d["choices"][0]["message"]["content"]
return json.loads(content)
except Exception as e:
log.warning(f"Parse fail: {e}")
return {"osobe": []}
def fuzzy_match_klub(naziv, conn):
"""Fuzzy match klub name to pgz_sport.klubovi.id"""
try:
from rapidfuzz import fuzz
cur = conn.cursor()
cur.execute("SELECT id, naziv FROM pgz_sport.klubovi LIMIT 1000")
rows = cur.fetchall()
best_id, best_score = None, 0
for kid, kname in rows:
score = fuzz.token_set_ratio(naziv.lower(), kname.lower())
if score > best_score:
best_score = score
best_id = kid
return best_id if best_score > 75 else None
except Exception as e:
log.warning(f"Fuzzy match fail: {e}")
return None
def insert_persons(persons_data, year, conn):
"""Insert extracted persons into pgz_sport.clanovi."""
osobe = persons_data.get("osobe", [])
if not osobe:
return 0
inserted = 0
cur = conn.cursor()
for o in osobe:
ime = (o.get("ime") or "").strip()
prezime = (o.get("prezime") or "").strip()
if not ime or not prezime:
continue
klub_naziv = (o.get("klub") or "").strip()
klub_id = fuzzy_match_klub(klub_naziv, conn) if klub_naziv else None
uloga = (o.get("uloga") or "igrac").strip()
# Validate uloga
VALID_ULOGE = {
"predsjednik", "dopredsjednik", "tajnik", "blagajnik", "clan_uprave",
"igrac", "sportas", "glavni_trener", "trener", "pomocni_trener",
"kondicioni_trener", "selektor", "izbornik", "team_manager", "voditelj",
"lijecnik", "fizioterapeut", "kineziolog", "maser", "sudac", "volonter"
}
if uloga not in VALID_ULOGE:
uloga = "igrac"
profile_key = f"godisnjak:{year}:{ime}:{prezime}:{klub_naziv}"
try:
cur.execute("""
INSERT INTO pgz_sport.clanovi
(ime, prezime, uloga, klub_id, savez_izvor, metadata, kategorija)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
RETURNING id
""", (
ime, prezime, uloga, klub_id,
"godisnjak",
json.dumps({"year": year, "klub_naziv": klub_naziv, "key": profile_key}),
"sportas",
))
if cur.fetchone():
inserted += 1
except Exception as e:
log.warning(f"Insert fail {ime} {prezime}: {e}")
conn.rollback()
conn.commit()
return inserted
async def phase1_embed(files_layout):
"""Embed sve godisnjake u pgz_universe."""
log.info(f"Phase 1: Embed {len(files_layout)} godisnjaka")
qdrant = QdrantClient(host="localhost", port=6333)
all_chunks = []
all_meta = []
for f in files_layout:
m = re.search(r"godisnjak_(\d{4})_layout", f)
year = m.group(1) if m else "unknown"
with open(f) as fp:
text = fp.read()
chunks = chunk_text(text)
for i, c in enumerate(chunks):
all_chunks.append(c)
all_meta.append({"year": year, "chunk_idx": i, "source": f.split("/")[-1]})
log.info(f"Total chunks: {len(all_chunks)}")
points = []
BATCH = 32
for i in range(0, len(all_chunks), BATCH):
batch = all_chunks[i : i + BATCH]
try:
embeddings = await embed_batch(batch)
for j, (text, emb) in enumerate(zip(batch, embeddings)):
meta = all_meta[i + j]
pid_key = f"godisnjak:{meta['source']}:{meta['chunk_idx']}"
point_id = int(hashlib.md5(pid_key.encode()).hexdigest()[:15], 16)
points.append(
PointStruct(
id=point_id,
vector=emb,
payload={**meta, "text": text[:1500], "type": "godisnjak_pgz"},
)
)
except Exception as e:
log.warning(f"Embed batch {i} fail: {e}")
await asyncio.sleep(2)
if i % 200 == 0:
log.info(f" Embed progress: {i}/{len(all_chunks)}")
qdrant.upsert(collection_name=QDRANT_COLLECTION, points=points)
log.info(f"Phase 1 DONE: {len(points)} chunks → {QDRANT_COLLECTION}")
return len(points)
async def phase2_extract(files_layout):
"""LLM ekstrakcija osoba/uloga iz godisnjaka."""
log.info(f"Phase 2: LLM extract persons from {len(files_layout)} godisnjaka")
conn = psycopg2.connect(DSN)
conn.autocommit = False
total_inserted = 0
semaphore = asyncio.Semaphore(MAX_WORKERS)
async def process_file(f):
nonlocal total_inserted
m = re.search(r"godisnjak_(\d{4})_layout", f)
year = m.group(1) if m else "unknown"
with open(f) as fp:
text = fp.read()
chunks = chunk_text(text)
log.info(f" Year {year}: {len(chunks)} chunks")
year_inserted = 0
for i, chunk in enumerate(chunks):
async with semaphore:
try:
persons = await extract_persons(chunk)
n = insert_persons(persons, year, conn)
year_inserted += n
if i % 10 == 0:
log.info(f" {year} chunk {i}/{len(chunks)}: {n} osoba")
except Exception as e:
log.warning(f"Extract/insert fail {year} chunk {i}: {e}")
await asyncio.sleep(0.5)
total_inserted += year_inserted
log.info(f" Year {year} DONE: {year_inserted} osoba inserted")
tasks = [process_file(f) for f in files_layout]
await asyncio.gather(*tasks)
conn.close()
log.info(f"Phase 2 DONE: {total_inserted} total osoba inserted")
return total_inserted
async def main():
files_layout = sorted(glob.glob(f"{DATA_DIR}/godisnjak_*_layout.txt"))
log.info(f"Found {len(files_layout)} layout files: {[f.split('/')[-1] for f in files_layout]}")
if not files_layout:
log.error("Nema layout fajlova!")
sys.exit(1)
# Phase 1: Embed
n_embedded = await phase1_embed(files_layout)
# Phase 2: LLM extract
n_persons = await phase2_extract(files_layout)
# Final stats
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("SELECT count(*) FROM pgz_sport.clanovi WHERE savez_izvor='godisnjak'")
total_godisnjak = cur.fetchone()[0]
conn.close()
log.info(f"""
=== GODISNJAK PIPELINE COMPLETE ===
Chunks embedded: {n_embedded}
Persons extracted: {n_persons}
Total godisnjak clanovi u DB: {total_godisnjak}
""")
# Telegram
import requests as req_lib
req_lib.post(
"https://api.telegram.org/bot8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y/sendMessage",
data={"chat_id": "7969491558", "text": f"✅ Godisnjak pipeline DONE: {n_embedded} chunks, {n_persons} osoba, {total_godisnjak} total u DB"},
timeout=10,
)
if __name__ == "__main__":
asyncio.run(main())