#!/usr/bin/env python3 # ═══════════════════════════════════════════════════════════════════ # Fajl: godisnjak_pipeline.py # Verzija: 1.0.0 # Datum: 03.05.2026 # Autor: Damir Radulić # Lokacija: /opt/pgz-sport/scripts/godisnjak_pipeline.py # Svrha: Embed godisnjaci PGZ u pgz_universe + LLM ekstrakcija osoba/uloga # Zavisi od: qdrant_client, httpx, psycopg2, rapidfuzz # Utječe na: pgz_universe (Qdrant), pgz_sport.clanovi (insert) # ═══════════════════════════════════════════════════════════════════ """Godisnjak PGZ embed + LLM person extraction pipeline.""" import asyncio import glob import hashlib import json import logging import re import sys import time from concurrent.futures import ThreadPoolExecutor import httpx import psycopg2 from psycopg2.extras import execute_batch from qdrant_client import QdrantClient from qdrant_client.models import PointStruct logging.basicConfig( level=logging.INFO, format="%(asctime)s [godisnjak] %(levelname)s: %(message)s", handlers=[ logging.FileHandler("/opt/pgz-sport/logs/godisnjak_pipeline.log"), logging.StreamHandler(), ], ) log = logging.getLogger("godisnjak") DSN = "host=10.10.0.2 port=6432 dbname=rinet_v3 user=rinet password=R1net2026!SecureDB#v7" EMBED_URL = "http://localhost:9879/api/embeddings" VLLM_URL = "http://localhost:8001/v1/chat/completions" VLLM_MODEL = "Qwen/Qwen2.5-7B-Instruct-AWQ" QDRANT_COLLECTION = "pgz_universe" DATA_DIR = "/opt/pgz-sport/_data/godisnjaci" MAX_WORKERS = 5 CHUNK_SIZE = 1500 # < 2000 zbog BGE-M3 truncation EXTRACT_PROMPT = """Ekstrahiraj iz teksta SVA imena osoba i njihove uloge. Format strogo JSON: {"osobe": [{"ime":"X","prezime":"Y","klub":"Z","uloga":"predsjednik|igrac|trener|tajnik|fizioterapeut|lijecnik","godina_rodenja":1990}]} Uloge ISKLJUCIVO: predsjednik, dopredsjednik, tajnik, blagajnik, clan_uprave, igrac, sportas, glavni_trener, trener, pomocni_trener, kondicioni_trener, selektor, izbornik, team_manager, voditelj, lijecnik, fizioterapeut, kineziolog, maser, sudac, volonter Pravila: 1. Samo HRVATSKE osobe (ne strani sportasi koji su gostovali) 2. Ako klub nije jasan -> ostavi prazan string 3. NE izmisljaj imena -> samo ona JASNO IZRAZENA u tekstu 4. Vrati VALID JSON bez markdown backtick-ova""" def chunk_text(text, size=CHUNK_SIZE): paragraphs = re.split(r"\n\n+", text) chunks, cur = [], "" for p in paragraphs: if len(cur) + len(p) > size: if cur: chunks.append(cur.strip()) cur = p else: cur += "\n\n" + p if cur: chunks.append(cur.strip()) return [c for c in chunks if len(c) > 100] async def embed_batch(texts): async with httpx.AsyncClient(timeout=120.0) as c: r = await c.post(EMBED_URL, json={"texts": texts}) d = r.json() return d.get("embeddings", []) async def extract_persons(chunk_text_str): async with httpx.AsyncClient(timeout=120.0) as c: r = await c.post( VLLM_URL, json={ "model": VLLM_MODEL, "messages": [ {"role": "system", "content": EXTRACT_PROMPT}, {"role": "user", "content": chunk_text_str[:5500]}, ], "temperature": 0.1, "max_tokens": 3000, "response_format": {"type": "json_object"}, }, ) d = r.json() try: content = d["choices"][0]["message"]["content"] return json.loads(content) except Exception as e: log.warning(f"Parse fail: {e}") return {"osobe": []} def fuzzy_match_klub(naziv, conn): """Fuzzy match klub name to pgz_sport.klubovi.id""" try: from rapidfuzz import fuzz cur = conn.cursor() cur.execute("SELECT id, naziv FROM pgz_sport.klubovi LIMIT 1000") rows = cur.fetchall() best_id, best_score = None, 0 for kid, kname in rows: score = fuzz.token_set_ratio(naziv.lower(), kname.lower()) if score > best_score: best_score = score best_id = kid return best_id if best_score > 75 else None except Exception as e: log.warning(f"Fuzzy match fail: {e}") return None def insert_persons(persons_data, year, conn): """Insert extracted persons into pgz_sport.clanovi.""" osobe = persons_data.get("osobe", []) if not osobe: return 0 inserted = 0 cur = conn.cursor() for o in osobe: ime = (o.get("ime") or "").strip() prezime = (o.get("prezime") or "").strip() if not ime or not prezime: continue klub_naziv = (o.get("klub") or "").strip() klub_id = fuzzy_match_klub(klub_naziv, conn) if klub_naziv else None uloga = (o.get("uloga") or "igrac").strip() # Validate uloga VALID_ULOGE = { "predsjednik", "dopredsjednik", "tajnik", "blagajnik", "clan_uprave", "igrac", "sportas", "glavni_trener", "trener", "pomocni_trener", "kondicioni_trener", "selektor", "izbornik", "team_manager", "voditelj", "lijecnik", "fizioterapeut", "kineziolog", "maser", "sudac", "volonter" } if uloga not in VALID_ULOGE: uloga = "igrac" profile_key = f"godisnjak:{year}:{ime}:{prezime}:{klub_naziv}" try: cur.execute(""" INSERT INTO pgz_sport.clanovi (ime, prezime, uloga, klub_id, savez_izvor, metadata, kategorija) VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING id """, ( ime, prezime, uloga, klub_id, "godisnjak", json.dumps({"year": year, "klub_naziv": klub_naziv, "key": profile_key}), "sportas", )) if cur.fetchone(): inserted += 1 except Exception as e: log.warning(f"Insert fail {ime} {prezime}: {e}") conn.rollback() conn.commit() return inserted async def phase1_embed(files_layout): """Embed sve godisnjake u pgz_universe.""" log.info(f"Phase 1: Embed {len(files_layout)} godisnjaka") qdrant = QdrantClient(host="localhost", port=6333) all_chunks = [] all_meta = [] for f in files_layout: m = re.search(r"godisnjak_(\d{4})_layout", f) year = m.group(1) if m else "unknown" with open(f) as fp: text = fp.read() chunks = chunk_text(text) for i, c in enumerate(chunks): all_chunks.append(c) all_meta.append({"year": year, "chunk_idx": i, "source": f.split("/")[-1]}) log.info(f"Total chunks: {len(all_chunks)}") points = [] BATCH = 32 for i in range(0, len(all_chunks), BATCH): batch = all_chunks[i : i + BATCH] try: embeddings = await embed_batch(batch) for j, (text, emb) in enumerate(zip(batch, embeddings)): meta = all_meta[i + j] pid_key = f"godisnjak:{meta['source']}:{meta['chunk_idx']}" point_id = int(hashlib.md5(pid_key.encode()).hexdigest()[:15], 16) points.append( PointStruct( id=point_id, vector=emb, payload={**meta, "text": text[:1500], "type": "godisnjak_pgz"}, ) ) except Exception as e: log.warning(f"Embed batch {i} fail: {e}") await asyncio.sleep(2) if i % 200 == 0: log.info(f" Embed progress: {i}/{len(all_chunks)}") qdrant.upsert(collection_name=QDRANT_COLLECTION, points=points) log.info(f"Phase 1 DONE: {len(points)} chunks → {QDRANT_COLLECTION}") return len(points) async def phase2_extract(files_layout): """LLM ekstrakcija osoba/uloga iz godisnjaka.""" log.info(f"Phase 2: LLM extract persons from {len(files_layout)} godisnjaka") conn = psycopg2.connect(DSN) conn.autocommit = False total_inserted = 0 semaphore = asyncio.Semaphore(MAX_WORKERS) async def process_file(f): nonlocal total_inserted m = re.search(r"godisnjak_(\d{4})_layout", f) year = m.group(1) if m else "unknown" with open(f) as fp: text = fp.read() chunks = chunk_text(text) log.info(f" Year {year}: {len(chunks)} chunks") year_inserted = 0 for i, chunk in enumerate(chunks): async with semaphore: try: persons = await extract_persons(chunk) n = insert_persons(persons, year, conn) year_inserted += n if i % 10 == 0: log.info(f" {year} chunk {i}/{len(chunks)}: {n} osoba") except Exception as e: log.warning(f"Extract/insert fail {year} chunk {i}: {e}") await asyncio.sleep(0.5) total_inserted += year_inserted log.info(f" Year {year} DONE: {year_inserted} osoba inserted") tasks = [process_file(f) for f in files_layout] await asyncio.gather(*tasks) conn.close() log.info(f"Phase 2 DONE: {total_inserted} total osoba inserted") return total_inserted async def main(): files_layout = sorted(glob.glob(f"{DATA_DIR}/godisnjak_*_layout.txt")) log.info(f"Found {len(files_layout)} layout files: {[f.split('/')[-1] for f in files_layout]}") if not files_layout: log.error("Nema layout fajlova!") sys.exit(1) # Phase 1: Embed n_embedded = await phase1_embed(files_layout) # Phase 2: LLM extract n_persons = await phase2_extract(files_layout) # Final stats conn = psycopg2.connect(DSN) cur = conn.cursor() cur.execute("SELECT count(*) FROM pgz_sport.clanovi WHERE savez_izvor='godisnjak'") total_godisnjak = cur.fetchone()[0] conn.close() log.info(f""" === GODISNJAK PIPELINE COMPLETE === Chunks embedded: {n_embedded} Persons extracted: {n_persons} Total godisnjak clanovi u DB: {total_godisnjak} """) # Telegram import requests as req_lib req_lib.post( "https://api.telegram.org/bot8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y/sendMessage", data={"chat_id": "7969491558", "text": f"✅ Godisnjak pipeline DONE: {n_embedded} chunks, {n_persons} osoba, {total_godisnjak} total u DB"}, timeout=10, ) if __name__ == "__main__": asyncio.run(main())