#!/usr/bin/env python3 # ═══════════════════════════════════════════════════════════════════ # Fajl: sport_doc_embedder.py # Verzija: 1.0.0 # Datum: 03.05.2026 # Autor: Damir Radulić # Lokacija: /opt/pgz-sport/sport_doc_embedder.py # Svrha: Embeddanje sport.documents u Qdrant pgz_sport_dokumenti_v1 # Zavisi od: psycopg2, requests, qdrant-client # Utječe na: sport.documents.embedding, Qdrant pgz_sport_dokumenti_v1 # ═══════════════════════════════════════════════════════════════════ import os, sys, time, logging, json, hashlib import requests, psycopg2 from psycopg2.extras import execute_batch sys.path.insert(0, '/opt/rinet-gpu') from db_config import DB_DSN logging.basicConfig(level=logging.INFO, format='%(asctime)s [sport-embed] %(levelname)s: %(message)s') log = logging.getLogger('sport-embed') EMBED_URL = 'http://localhost:9879/api/embeddings' QDRANT_URL = 'http://10.10.0.2:6333' COLLECTION = 'pgz_sport_dokumenti_v1' BATCH_SIZE = 32 DIM = 1024 def embed_texts(texts): r = requests.post(EMBED_URL, json={'texts': texts}, timeout=60) r.raise_for_status() return r.json() def ensure_collection(): r = requests.get(f'{QDRANT_URL}/collections/{COLLECTION}') if r.status_code == 404: requests.put(f'{QDRANT_URL}/collections/{COLLECTION}', json={ 'vectors': {'size': DIM, 'distance': 'Cosine'} }).raise_for_status() log.info(f'Created collection {COLLECTION}') def upsert_qdrant(points): r = requests.put( f'{QDRANT_URL}/collections/{COLLECTION}/points', json={'points': points}, timeout=30 ) r.raise_for_status() def run(): conn = psycopg2.connect(DB_DSN) conn.autocommit = False cur = conn.cursor() ensure_collection() cur.execute(""" SELECT id, title, doc_type, source, text_extracted, publish_date FROM sport.documents WHERE embedding IS NULL AND text_extracted IS NOT NULL AND length(text_extracted) > 50 ORDER BY id LIMIT 5000 """) rows = cur.fetchall() log.info(f'Pending: {len(rows)} documents') total_done = 0 for i in range(0, len(rows), BATCH_SIZE): batch = rows[i:i+BATCH_SIZE] texts = [] for row in batch: id_, title, doc_type, source, text, pub_date = row snippet = f'{title or ""} {doc_type or ""} {source or ""} {str(text or "")[:800]}'.strip() texts.append(snippet) try: d = embed_texts(texts) if isinstance(d, list): embeddings = d elif isinstance(d, dict): embeddings = d.get('embeddings', [d['embedding']] if 'embedding' in d else []) else: embeddings = [] except Exception as e: log.error(f'Embed error batch {i}: {e}') time.sleep(5) continue # Update DB update_rows = [] qdrant_points = [] for j, row in enumerate(batch): if j >= len(embeddings): break id_, title, doc_type, source, text, pub_date = row emb = embeddings[j] update_rows.append((emb, id_)) qdrant_points.append({ 'id': id_, 'vector': emb, 'payload': { 'title': title or '', 'doc_type': doc_type or '', 'source': source or '', 'pub_date': str(pub_date) if pub_date else '', 'text_snippet': str(text or '')[:300] } }) execute_batch(cur, 'UPDATE sport.documents SET embedding=%s WHERE id=%s', update_rows) conn.commit() try: upsert_qdrant(qdrant_points) except Exception as e: log.warning(f'Qdrant upsert error: {e}') total_done += len(update_rows) if total_done % 200 == 0 or i == 0: log.info(f'Progress: {total_done}/{len(rows)} done') time.sleep(0.1) cur.close() conn.close() log.info(f'DONE: embedded {total_done} sport documents') # Telegram notification try: tg_token = '8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y' tg_chat = '7969491558' requests.post( f'https://api.telegram.org/bot{tg_token}/sendMessage', data={'chat_id': tg_chat, 'text': f'✅ sport_doc_embedder: {total_done} docs embeddano u Qdrant'}, timeout=10 ) except Exception: pass if __name__ == '__main__': run()