Files
pgz-sport/sport_doc_embedder.py

139 lines
4.7 KiB
Python

#!/usr/bin/env python3
# ═══════════════════════════════════════════════════════════════════
# Fajl: sport_doc_embedder.py
# Verzija: 1.0.0
# Datum: 03.05.2026
# Autor: Damir Radulić <dradulic@outlook.com>
# Lokacija: /opt/pgz-sport/sport_doc_embedder.py
# Svrha: Embeddanje sport.documents u Qdrant pgz_sport_dokumenti_v1
# Zavisi od: psycopg2, requests, qdrant-client
# Utječe na: sport.documents.embedding, Qdrant pgz_sport_dokumenti_v1
# ═══════════════════════════════════════════════════════════════════
import os, sys, time, logging, json, hashlib
import requests, psycopg2
from psycopg2.extras import execute_batch
sys.path.insert(0, '/opt/rinet-gpu')
from db_config import DB_DSN
logging.basicConfig(level=logging.INFO, format='%(asctime)s [sport-embed] %(levelname)s: %(message)s')
log = logging.getLogger('sport-embed')
EMBED_URL = 'http://localhost:9879/api/embeddings'
QDRANT_URL = 'http://10.10.0.2:6333'
COLLECTION = 'pgz_sport_dokumenti_v1'
BATCH_SIZE = 32
DIM = 1024
def embed_texts(texts):
r = requests.post(EMBED_URL, json={'texts': texts}, timeout=60)
r.raise_for_status()
return r.json()
def ensure_collection():
r = requests.get(f'{QDRANT_URL}/collections/{COLLECTION}')
if r.status_code == 404:
requests.put(f'{QDRANT_URL}/collections/{COLLECTION}', json={
'vectors': {'size': DIM, 'distance': 'Cosine'}
}).raise_for_status()
log.info(f'Created collection {COLLECTION}')
def upsert_qdrant(points):
r = requests.put(
f'{QDRANT_URL}/collections/{COLLECTION}/points',
json={'points': points}, timeout=30
)
r.raise_for_status()
def run():
conn = psycopg2.connect(DB_DSN)
conn.autocommit = False
cur = conn.cursor()
ensure_collection()
cur.execute("""
SELECT id, title, doc_type, source, text_extracted, publish_date
FROM sport.documents
WHERE embedding IS NULL AND text_extracted IS NOT NULL AND length(text_extracted) > 50
ORDER BY id
LIMIT 5000
""")
rows = cur.fetchall()
log.info(f'Pending: {len(rows)} documents')
total_done = 0
for i in range(0, len(rows), BATCH_SIZE):
batch = rows[i:i+BATCH_SIZE]
texts = []
for row in batch:
id_, title, doc_type, source, text, pub_date = row
snippet = f'{title or ""} {doc_type or ""} {source or ""} {str(text or "")[:800]}'.strip()
texts.append(snippet)
try:
d = embed_texts(texts)
if isinstance(d, list):
embeddings = d
elif isinstance(d, dict):
embeddings = d.get('embeddings', [d['embedding']] if 'embedding' in d else [])
else:
embeddings = []
except Exception as e:
log.error(f'Embed error batch {i}: {e}')
time.sleep(5)
continue
# Update DB
update_rows = []
qdrant_points = []
for j, row in enumerate(batch):
if j >= len(embeddings):
break
id_, title, doc_type, source, text, pub_date = row
emb = embeddings[j]
update_rows.append((emb, id_))
qdrant_points.append({
'id': id_,
'vector': emb,
'payload': {
'title': title or '',
'doc_type': doc_type or '',
'source': source or '',
'pub_date': str(pub_date) if pub_date else '',
'text_snippet': str(text or '')[:300]
}
})
execute_batch(cur, 'UPDATE sport.documents SET embedding=%s WHERE id=%s', update_rows)
conn.commit()
try:
upsert_qdrant(qdrant_points)
except Exception as e:
log.warning(f'Qdrant upsert error: {e}')
total_done += len(update_rows)
if total_done % 200 == 0 or i == 0:
log.info(f'Progress: {total_done}/{len(rows)} done')
time.sleep(0.1)
cur.close()
conn.close()
log.info(f'DONE: embedded {total_done} sport documents')
# Telegram notification
try:
tg_token = '8535797835:AAFItT-92jzZ9NWFafLxn0dLa1_n2s-JE5Y'
tg_chat = '7969491558'
requests.post(
f'https://api.telegram.org/bot{tg_token}/sendMessage',
data={'chat_id': tg_chat, 'text': f'✅ sport_doc_embedder: {total_done} docs embeddano u Qdrant'},
timeout=10
)
except Exception:
pass
if __name__ == '__main__':
run()