#!/usr/bin/env python3 # ═══════════════════════════════════════════════════════════════════ # Fajl: godisnjak_pipeline.py # Verzija: 1.0.0 # Datum: 03.05.2026 # Autor: Damir Radulić # Lokacija: /opt/pgz-sport/scripts/godisnjak_pipeline.py # Svrha: Embed godisnjaci PGZ u pgz_universe + LLM ekstrakcija osoba/uloga # Zavisi od: qdrant_client, httpx, psycopg2, rapidfuzz # Utječe na: pgz_universe (Qdrant), pgz_sport.clanovi (insert) # ═══════════════════════════════════════════════════════════════════ """Godisnjak PGZ embed + LLM person extraction pipeline.""" import asyncio import glob import hashlib import json import logging import re import sys sys.path.insert(0, '/opt/rinet-gpu/lib') try: from tg_notify import notify as _tg_notify except ImportError: _tg_notify = None import time from concurrent.futures import ThreadPoolExecutor import httpx import psycopg2 from psycopg2.extras import execute_batch from qdrant_client import QdrantClient from qdrant_client.models import PointStruct logging.basicConfig( level=logging.INFO, format="%(asctime)s [godisnjak] %(levelname)s: %(message)s", handlers=[ logging.FileHandler("/opt/pgz-sport/logs/godisnjak_pipeline.log"), logging.StreamHandler(), ], ) log = logging.getLogger("godisnjak") DSN = "host=10.10.0.2 port=6432 dbname=rinet_v3 user=rinet password=R1net2026!SecureDB#v7" EMBED_URL = "http://localhost:9879/api/embeddings" VLLM_URL = "http://localhost:8001/v1/chat/completions" VLLM_MODEL = "Qwen/Qwen2.5-7B-Instruct-AWQ" QDRANT_COLLECTION = "pgz_universe" DATA_DIR = "/opt/pgz-sport/_data/godisnjaci" MAX_WORKERS = 5 CHUNK_SIZE = 1500 # < 2000 zbog BGE-M3 truncation EXTRACT_PROMPT = """Ekstrahiraj iz teksta SVA imena osoba i njihove uloge. Format strogo JSON: {"osobe": [{"ime":"X","prezime":"Y","klub":"Z","uloga":"predsjednik|igrac|trener|tajnik|fizioterapeut|lijecnik","godina_rodenja":1990}]} Uloge ISKLJUCIVO: predsjednik, dopredsjednik, tajnik, blagajnik, clan_uprave, igrac, sportas, glavni_trener, trener, pomocni_trener, kondicioni_trener, selektor, izbornik, team_manager, voditelj, lijecnik, fizioterapeut, kineziolog, maser, sudac, volonter Pravila: 1. Samo HRVATSKE osobe (ne strani sportasi koji su gostovali) 2. Ako klub nije jasan -> ostavi prazan string 3. NE izmisljaj imena -> samo ona JASNO IZRAZENA u tekstu 4. Vrati VALID JSON bez markdown backtick-ova""" def chunk_text(text, size=CHUNK_SIZE): paragraphs = re.split(r"\n\n+", text) chunks, cur = [], "" for p in paragraphs: if len(cur) + len(p) > size: if cur: chunks.append(cur.strip()) cur = p else: cur += "\n\n" + p if cur: chunks.append(cur.strip()) return [c for c in chunks if len(c) > 100] async def embed_batch(texts): async with httpx.AsyncClient(timeout=120.0) as c: r = await c.post(EMBED_URL, json={"texts": texts}) d = r.json() return d.get("embeddings", []) async def extract_persons(chunk_text_str): async with httpx.AsyncClient(timeout=120.0) as c: r = await c.post( VLLM_URL, json={ "model": VLLM_MODEL, "messages": [ {"role": "system", "content": EXTRACT_PROMPT}, {"role": "user", "content": chunk_text_str[:5500]}, ], "temperature": 0.1, "max_tokens": 3000, "response_format": {"type": "json_object"}, }, ) d = r.json() try: content = d["choices"][0]["message"]["content"] return json.loads(content) except Exception as e: log.warning(f"Parse fail: {e}") return {"osobe": []} def fuzzy_match_klub(naziv, conn): """Fuzzy match klub name to pgz_sport.klubovi.id""" try: from rapidfuzz import fuzz cur = conn.cursor() cur.execute("SELECT id, naziv FROM pgz_sport.klubovi LIMIT 1000") rows = cur.fetchall() best_id, best_score = None, 0 for kid, kname in rows: score = fuzz.token_set_ratio(naziv.lower(), kname.lower()) if score > best_score: best_score = score best_id = kid return best_id if best_score > 75 else None except Exception as e: log.warning(f"Fuzzy match fail: {e}") return None def insert_persons(persons_data, year, conn): """Insert extracted persons into pgz_sport.clanovi.""" osobe = persons_data.get("osobe", []) if not osobe: return 0 inserted = 0 cur = conn.cursor() for o in osobe: ime = (o.get("ime") or "").strip() prezime = (o.get("prezime") or "").strip() if not ime or not prezime: continue klub_naziv = (o.get("klub") or "").strip() klub_id = fuzzy_match_klub(klub_naziv, conn) if klub_naziv else None uloga = (o.get("uloga") or "igrac").strip() # Validate uloga VALID_ULOGE = { "predsjednik", "dopredsjednik", "tajnik", "blagajnik", "clan_uprave", "igrac", "sportas", "glavni_trener", "trener", "pomocni_trener", "kondicioni_trener", "selektor", "izbornik", "team_manager", "voditelj", "lijecnik", "fizioterapeut", "kineziolog", "maser", "sudac", "volonter" } if uloga not in VALID_ULOGE: uloga = "igrac" profile_key = f"godisnjak:{year}:{ime}:{prezime}:{klub_naziv}" try: cur.execute(""" INSERT INTO pgz_sport.clanovi (ime, prezime, uloga, klub_id, savez_izvor, metadata, kategorija) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING id """, ( ime, prezime, uloga, klub_id, "godisnjak", json.dumps({"year": year, "klub_naziv": klub_naziv, "key": profile_key}), "sportas", )) if cur.fetchone(): inserted += 1 except Exception as e: log.warning(f"Insert fail {ime} {prezime}: {e}") conn.rollback() conn.commit() return inserted async def phase1_embed(files_layout): """Embed sve godisnjake u pgz_universe.""" log.info(f"Phase 1: Embed {len(files_layout)} godisnjaka") qdrant = QdrantClient(host="localhost", port=6333) all_chunks = [] all_meta = [] for f in files_layout: m = re.search(r"godisnjak_(\d{4})_layout", f) year = m.group(1) if m else "unknown" with open(f) as fp: text = fp.read() chunks = chunk_text(text) for i, c in enumerate(chunks): all_chunks.append(c) all_meta.append({"year": year, "chunk_idx": i, "source": f.split("/")[-1]}) log.info(f"Total chunks: {len(all_chunks)}") points = [] BATCH = 32 for i in range(0, len(all_chunks), BATCH): batch = all_chunks[i : i + BATCH] try: embeddings = await embed_batch(batch) for j, (text, emb) in enumerate(zip(batch, embeddings)): meta = all_meta[i + j] pid_key = f"godisnjak:{meta['source']}:{meta['chunk_idx']}" point_id = int(hashlib.md5(pid_key.encode()).hexdigest()[:15], 16) points.append( PointStruct( id=point_id, vector=emb, payload={**meta, "text": text[:1500], "type": "godisnjak_pgz"}, ) ) except Exception as e: log.warning(f"Embed batch {i} fail: {e}") await asyncio.sleep(2) if i % 200 == 0: log.info(f" Embed progress: {i}/{len(all_chunks)}") qdrant.upsert(collection_name=QDRANT_COLLECTION, points=points) log.info(f"Phase 1 DONE: {len(points)} chunks → {QDRANT_COLLECTION}") return len(points) async def phase2_extract(files_layout): """LLM ekstrakcija osoba/uloga iz godisnjaka.""" log.info(f"Phase 2: LLM extract persons from {len(files_layout)} godisnjaka") conn = psycopg2.connect(DSN) conn.autocommit = False total_inserted = 0 semaphore = asyncio.Semaphore(MAX_WORKERS) async def process_file(f): nonlocal total_inserted m = re.search(r"godisnjak_(\d{4})_layout", f) year = m.group(1) if m else "unknown" with open(f) as fp: text = fp.read() chunks = chunk_text(text) log.info(f" Year {year}: {len(chunks)} chunks") year_inserted = 0 for i, chunk in enumerate(chunks): async with semaphore: try: persons = await extract_persons(chunk) n = insert_persons(persons, year, conn) year_inserted += n if i % 10 == 0: log.info(f" {year} chunk {i}/{len(chunks)}: {n} osoba") except Exception as e: log.warning(f"Extract/insert fail {year} chunk {i}: {e}") await asyncio.sleep(0.5) total_inserted += year_inserted log.info(f" Year {year} DONE: {year_inserted} osoba inserted") tasks = [process_file(f) for f in files_layout] await asyncio.gather(*tasks) conn.close() log.info(f"Phase 2 DONE: {total_inserted} total osoba inserted") return total_inserted async def main(): files_layout = sorted(glob.glob(f"{DATA_DIR}/godisnjak_*_layout.txt")) log.info(f"Found {len(files_layout)} layout files: {[f.split('/')[-1] for f in files_layout]}") if not files_layout: log.error("Nema layout fajlova!") sys.exit(1) # Phase 1: Embed n_embedded = await phase1_embed(files_layout) # Phase 2: LLM extract n_persons = await phase2_extract(files_layout) # Final stats conn = psycopg2.connect(DSN) cur = conn.cursor() cur.execute("SELECT count(*) FROM pgz_sport.clanovi WHERE savez_izvor='godisnjak'") total_godisnjak = cur.fetchone()[0] conn.close() log.info(f""" === GODISNJAK PIPELINE COMPLETE === Chunks embedded: {n_embedded} Persons extracted: {n_persons} Total godisnjak clanovi u DB: {total_godisnjak} """) # Telegram import requests as req_lib req_lib.post( "https://api.telegram.org/bot8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y/sendMessage", data={"chat_id": "7969491558", "text": f"✅ Godisnjak pipeline DONE: {n_embedded} chunks, {n_persons} osoba, {total_godisnjak} total u DB"}, timeout=10, ) if __name__ == "__main__": asyncio.run(main())