312 lines
11 KiB
Python
312 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
# Fajl: godisnjak_pipeline.py
|
|
# Verzija: 1.0.0
|
|
# Datum: 03.05.2026
|
|
# Autor: Damir Radulić <dradulic@outlook.com>
|
|
# Lokacija: /opt/pgz-sport/scripts/godisnjak_pipeline.py
|
|
# Svrha: Embed godisnjaci PGZ u pgz_universe + LLM ekstrakcija osoba/uloga
|
|
# Zavisi od: qdrant_client, httpx, psycopg2, rapidfuzz
|
|
# Utječe na: pgz_universe (Qdrant), pgz_sport.clanovi (insert)
|
|
# ═══════════════════════════════════════════════════════════════════
|
|
"""Godisnjak PGZ embed + LLM person extraction pipeline."""
|
|
import asyncio
|
|
import glob
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
import httpx
|
|
import psycopg2
|
|
from psycopg2.extras import execute_batch
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.models import PointStruct
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [godisnjak] %(levelname)s: %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("/opt/pgz-sport/logs/godisnjak_pipeline.log"),
|
|
logging.StreamHandler(),
|
|
],
|
|
)
|
|
log = logging.getLogger("godisnjak")
|
|
|
|
DSN = "host=10.10.0.2 port=6432 dbname=rinet_v3 user=rinet password=R1net2026!SecureDB#v7"
|
|
EMBED_URL = "http://localhost:9879/api/embeddings"
|
|
VLLM_URL = "http://localhost:8001/v1/chat/completions"
|
|
VLLM_MODEL = "Qwen/Qwen2.5-7B-Instruct-AWQ"
|
|
QDRANT_COLLECTION = "pgz_universe"
|
|
DATA_DIR = "/opt/pgz-sport/_data/godisnjaci"
|
|
MAX_WORKERS = 5
|
|
CHUNK_SIZE = 1500 # < 2000 zbog BGE-M3 truncation
|
|
|
|
EXTRACT_PROMPT = """Ekstrahiraj iz teksta SVA imena osoba i njihove uloge.
|
|
Format strogo JSON:
|
|
{"osobe": [{"ime":"X","prezime":"Y","klub":"Z","uloga":"predsjednik|igrac|trener|tajnik|fizioterapeut|lijecnik","godina_rodenja":1990}]}
|
|
|
|
Uloge ISKLJUCIVO: predsjednik, dopredsjednik, tajnik, blagajnik, clan_uprave, igrac, sportas, glavni_trener, trener, pomocni_trener, kondicioni_trener, selektor, izbornik, team_manager, voditelj, lijecnik, fizioterapeut, kineziolog, maser, sudac, volonter
|
|
|
|
Pravila:
|
|
1. Samo HRVATSKE osobe (ne strani sportasi koji su gostovali)
|
|
2. Ako klub nije jasan -> ostavi prazan string
|
|
3. NE izmisljaj imena -> samo ona JASNO IZRAZENA u tekstu
|
|
4. Vrati VALID JSON bez markdown backtick-ova"""
|
|
|
|
|
|
def chunk_text(text, size=CHUNK_SIZE):
|
|
paragraphs = re.split(r"\n\n+", text)
|
|
chunks, cur = [], ""
|
|
for p in paragraphs:
|
|
if len(cur) + len(p) > size:
|
|
if cur:
|
|
chunks.append(cur.strip())
|
|
cur = p
|
|
else:
|
|
cur += "\n\n" + p
|
|
if cur:
|
|
chunks.append(cur.strip())
|
|
return [c for c in chunks if len(c) > 100]
|
|
|
|
|
|
async def embed_batch(texts):
|
|
async with httpx.AsyncClient(timeout=120.0) as c:
|
|
r = await c.post(EMBED_URL, json={"texts": texts})
|
|
d = r.json()
|
|
return d.get("embeddings", [])
|
|
|
|
|
|
async def extract_persons(chunk_text_str):
|
|
async with httpx.AsyncClient(timeout=120.0) as c:
|
|
r = await c.post(
|
|
VLLM_URL,
|
|
json={
|
|
"model": VLLM_MODEL,
|
|
"messages": [
|
|
{"role": "system", "content": EXTRACT_PROMPT},
|
|
{"role": "user", "content": chunk_text_str[:5500]},
|
|
],
|
|
"temperature": 0.1,
|
|
"max_tokens": 3000,
|
|
"response_format": {"type": "json_object"},
|
|
},
|
|
)
|
|
d = r.json()
|
|
try:
|
|
content = d["choices"][0]["message"]["content"]
|
|
return json.loads(content)
|
|
except Exception as e:
|
|
log.warning(f"Parse fail: {e}")
|
|
return {"osobe": []}
|
|
|
|
|
|
def fuzzy_match_klub(naziv, conn):
|
|
"""Fuzzy match klub name to pgz_sport.klubovi.id"""
|
|
try:
|
|
from rapidfuzz import fuzz
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT id, naziv FROM pgz_sport.klubovi LIMIT 1000")
|
|
rows = cur.fetchall()
|
|
best_id, best_score = None, 0
|
|
for kid, kname in rows:
|
|
score = fuzz.token_set_ratio(naziv.lower(), kname.lower())
|
|
if score > best_score:
|
|
best_score = score
|
|
best_id = kid
|
|
return best_id if best_score > 75 else None
|
|
except Exception as e:
|
|
log.warning(f"Fuzzy match fail: {e}")
|
|
return None
|
|
|
|
|
|
def insert_persons(persons_data, year, conn):
|
|
"""Insert extracted persons into pgz_sport.clanovi."""
|
|
osobe = persons_data.get("osobe", [])
|
|
if not osobe:
|
|
return 0
|
|
|
|
inserted = 0
|
|
cur = conn.cursor()
|
|
|
|
for o in osobe:
|
|
ime = (o.get("ime") or "").strip()
|
|
prezime = (o.get("prezime") or "").strip()
|
|
if not ime or not prezime:
|
|
continue
|
|
|
|
klub_naziv = (o.get("klub") or "").strip()
|
|
klub_id = fuzzy_match_klub(klub_naziv, conn) if klub_naziv else None
|
|
uloga = (o.get("uloga") or "igrac").strip()
|
|
|
|
# Validate uloga
|
|
VALID_ULOGE = {
|
|
"predsjednik", "dopredsjednik", "tajnik", "blagajnik", "clan_uprave",
|
|
"igrac", "sportas", "glavni_trener", "trener", "pomocni_trener",
|
|
"kondicioni_trener", "selektor", "izbornik", "team_manager", "voditelj",
|
|
"lijecnik", "fizioterapeut", "kineziolog", "maser", "sudac", "volonter"
|
|
}
|
|
if uloga not in VALID_ULOGE:
|
|
uloga = "igrac"
|
|
|
|
profile_key = f"godisnjak:{year}:{ime}:{prezime}:{klub_naziv}"
|
|
|
|
try:
|
|
cur.execute("""
|
|
INSERT INTO pgz_sport.clanovi
|
|
(ime, prezime, uloga, klub_id, savez_izvor, metadata, kategorija)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
ON CONFLICT DO NOTHING
|
|
RETURNING id
|
|
""", (
|
|
ime, prezime, uloga, klub_id,
|
|
"godisnjak",
|
|
json.dumps({"year": year, "klub_naziv": klub_naziv, "key": profile_key}),
|
|
"sportas",
|
|
))
|
|
if cur.fetchone():
|
|
inserted += 1
|
|
except Exception as e:
|
|
log.warning(f"Insert fail {ime} {prezime}: {e}")
|
|
conn.rollback()
|
|
|
|
conn.commit()
|
|
return inserted
|
|
|
|
|
|
async def phase1_embed(files_layout):
|
|
"""Embed sve godisnjake u pgz_universe."""
|
|
log.info(f"Phase 1: Embed {len(files_layout)} godisnjaka")
|
|
qdrant = QdrantClient(host="localhost", port=6333)
|
|
|
|
all_chunks = []
|
|
all_meta = []
|
|
for f in files_layout:
|
|
m = re.search(r"godisnjak_(\d{4})_layout", f)
|
|
year = m.group(1) if m else "unknown"
|
|
with open(f) as fp:
|
|
text = fp.read()
|
|
chunks = chunk_text(text)
|
|
for i, c in enumerate(chunks):
|
|
all_chunks.append(c)
|
|
all_meta.append({"year": year, "chunk_idx": i, "source": f.split("/")[-1]})
|
|
|
|
log.info(f"Total chunks: {len(all_chunks)}")
|
|
|
|
points = []
|
|
BATCH = 32
|
|
for i in range(0, len(all_chunks), BATCH):
|
|
batch = all_chunks[i : i + BATCH]
|
|
try:
|
|
embeddings = await embed_batch(batch)
|
|
for j, (text, emb) in enumerate(zip(batch, embeddings)):
|
|
meta = all_meta[i + j]
|
|
pid_key = f"godisnjak:{meta['source']}:{meta['chunk_idx']}"
|
|
point_id = int(hashlib.md5(pid_key.encode()).hexdigest()[:15], 16)
|
|
points.append(
|
|
PointStruct(
|
|
id=point_id,
|
|
vector=emb,
|
|
payload={**meta, "text": text[:1500], "type": "godisnjak_pgz"},
|
|
)
|
|
)
|
|
except Exception as e:
|
|
log.warning(f"Embed batch {i} fail: {e}")
|
|
await asyncio.sleep(2)
|
|
|
|
if i % 200 == 0:
|
|
log.info(f" Embed progress: {i}/{len(all_chunks)}")
|
|
|
|
qdrant.upsert(collection_name=QDRANT_COLLECTION, points=points)
|
|
log.info(f"Phase 1 DONE: {len(points)} chunks → {QDRANT_COLLECTION}")
|
|
return len(points)
|
|
|
|
|
|
async def phase2_extract(files_layout):
|
|
"""LLM ekstrakcija osoba/uloga iz godisnjaka."""
|
|
log.info(f"Phase 2: LLM extract persons from {len(files_layout)} godisnjaka")
|
|
|
|
conn = psycopg2.connect(DSN)
|
|
conn.autocommit = False
|
|
|
|
total_inserted = 0
|
|
semaphore = asyncio.Semaphore(MAX_WORKERS)
|
|
|
|
async def process_file(f):
|
|
nonlocal total_inserted
|
|
m = re.search(r"godisnjak_(\d{4})_layout", f)
|
|
year = m.group(1) if m else "unknown"
|
|
|
|
with open(f) as fp:
|
|
text = fp.read()
|
|
|
|
chunks = chunk_text(text)
|
|
log.info(f" Year {year}: {len(chunks)} chunks")
|
|
|
|
year_inserted = 0
|
|
for i, chunk in enumerate(chunks):
|
|
async with semaphore:
|
|
try:
|
|
persons = await extract_persons(chunk)
|
|
n = insert_persons(persons, year, conn)
|
|
year_inserted += n
|
|
if i % 10 == 0:
|
|
log.info(f" {year} chunk {i}/{len(chunks)}: {n} osoba")
|
|
except Exception as e:
|
|
log.warning(f"Extract/insert fail {year} chunk {i}: {e}")
|
|
await asyncio.sleep(0.5)
|
|
|
|
total_inserted += year_inserted
|
|
log.info(f" Year {year} DONE: {year_inserted} osoba inserted")
|
|
|
|
tasks = [process_file(f) for f in files_layout]
|
|
await asyncio.gather(*tasks)
|
|
|
|
conn.close()
|
|
log.info(f"Phase 2 DONE: {total_inserted} total osoba inserted")
|
|
return total_inserted
|
|
|
|
|
|
async def main():
|
|
files_layout = sorted(glob.glob(f"{DATA_DIR}/godisnjak_*_layout.txt"))
|
|
log.info(f"Found {len(files_layout)} layout files: {[f.split('/')[-1] for f in files_layout]}")
|
|
|
|
if not files_layout:
|
|
log.error("Nema layout fajlova!")
|
|
sys.exit(1)
|
|
|
|
# Phase 1: Embed
|
|
n_embedded = await phase1_embed(files_layout)
|
|
|
|
# Phase 2: LLM extract
|
|
n_persons = await phase2_extract(files_layout)
|
|
|
|
# Final stats
|
|
conn = psycopg2.connect(DSN)
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT count(*) FROM pgz_sport.clanovi WHERE savez_izvor='godisnjak'")
|
|
total_godisnjak = cur.fetchone()[0]
|
|
conn.close()
|
|
|
|
log.info(f"""
|
|
=== GODISNJAK PIPELINE COMPLETE ===
|
|
Chunks embedded: {n_embedded}
|
|
Persons extracted: {n_persons}
|
|
Total godisnjak clanovi u DB: {total_godisnjak}
|
|
""")
|
|
|
|
# Telegram
|
|
import requests as req_lib
|
|
req_lib.post(
|
|
"https://api.telegram.org/bot8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y/sendMessage",
|
|
data={"chat_id": "7969491558", "text": f"✅ Godisnjak pipeline DONE: {n_embedded} chunks, {n_persons} osoba, {total_godisnjak} total u DB"},
|
|
timeout=10,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|