#!/usr/bin/env python3 """ OCR worker daemon — pgz_sport.invoice_uploads Polls pending uploads → OCR (tesseract / pdfplumber) → regex extraction → (optional LLM) → updates ai_invoice_no, ai_vendor_name, ai_vendor_oib, ai_amount_gross, ai_extracted → flips ocr_status to 'done' or 'failed' """ import os, re, time, json, subprocess, traceback, hashlib import psycopg2, psycopg2.extras from pathlib import Path DB = dict(host='10.10.0.2', port=5432, dbname='rinet_v3', user='rinet', password=os.environ["DB_PASSWORD"]) POLL = 8 # seconds def db(): c = psycopg2.connect(**DB); c.autocommit = True; return c def claim_one(): """Claim 1 pending row → 'processing'.""" with db() as c: cur = c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute("""UPDATE pgz_sport.invoice_uploads SET ocr_status='processing', processed_at=NOW() WHERE id = (SELECT id FROM pgz_sport.invoice_uploads WHERE ocr_status='pending' ORDER BY uploaded_at LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING *""") return cur.fetchone() def update_done(uid, fields): sets, args = [], [] for k,v in fields.items(): sets.append(f"{k}=%s"); args.append(v) args.append(uid) with db() as c: c.cursor().execute(f"UPDATE pgz_sport.invoice_uploads SET {','.join(sets)} WHERE id=%s", args) def fail(uid, err): update_done(uid, {'ocr_status':'failed', 'ai_extracted': json.dumps({'error': err[:500]})}) def extract_text_from_file(path): """Returns (text, method).""" p = Path(path) if not p.exists(): return ('', 'missing') suf = p.suffix.lower() if suf == '.pdf': # Try pdftotext first (fast, embedded text) try: r = subprocess.run(['pdftotext','-layout',str(p),'-'], capture_output=True, timeout=60) txt = r.stdout.decode('utf-8','ignore') if len(txt.strip()) > 100: return (txt, 'pdftotext') except Exception: pass # Fallback: rasterize + tesseract try: tmp = f'/tmp/ocr_{p.stem}' os.makedirs(tmp, exist_ok=True) subprocess.run(['pdftoppm','-r','200',str(p), f'{tmp}/page'], timeout=120, check=True) chunks = [] for img in sorted(Path(tmp).glob('page-*.ppm')): r = subprocess.run(['tesseract', str(img),'-','-l','hrv+eng','--psm','6'], capture_output=True, timeout=90) chunks.append(r.stdout.decode('utf-8','ignore')) for f in Path(tmp).glob('*'): f.unlink() os.rmdir(tmp) return ('\n'.join(chunks), 'tesseract') except Exception as e: return ('', f'pdf_err:{e}') elif suf in ('.jpg','.jpeg','.png','.tiff','.tif'): try: r = subprocess.run(['tesseract', str(p),'-','-l','hrv+eng','--psm','6'], capture_output=True, timeout=90) return (r.stdout.decode('utf-8','ignore'), 'tesseract') except Exception as e: return ('', f'img_err:{e}') elif suf in ('.txt','.csv'): return (p.read_text(errors='ignore'), 'text') return ('', f'unsupported:{suf}') # Croatian invoice patterns RE_OIB = re.compile(r'\b(\d{11})\b') RE_DATE_DOT = re.compile(r'\b(\d{1,2})[.\s\-]+(\d{1,2})[.\s\-]+(20\d{2})\b') RE_DATE_ISO = re.compile(r'\b(20\d{2})[\-/](\d{1,2})[\-/](\d{1,2})\b') RE_INVOICE_NO = re.compile(r'(?i)(?:ra[čc]un|invoice|broj|fakture|broj fakture|no\.?|br\.?)[\s:]+([A-Z0-9\-/.]{4,30})') RE_AMOUNT = re.compile(r'(?i)(?:ukupno|to pay|total|za platiti|iznos|sveukupno|za naplatu)[\s:€]*([\d.,]{4,15})') RE_IBAN = re.compile(r'\b(HR\d{19})\b') RE_VAT = re.compile(r'(?i)(?:pdv|vat)[\s:]*?([\d,.]+)') def parse_amount(s): if not s: return None s = s.replace(' ','').replace('.','').replace(',','.') if s.count(',')==1 else s.replace(',','') try: return float(s) except: return None def extract_fields(text): """Best-effort regex-based field extraction for HR invoices.""" out = {'raw_chars': len(text)} # OIBs (vendor first usually appears in header) oibs = RE_OIB.findall(text or '') if oibs: out['oibs_found'] = list(dict.fromkeys(oibs)) out['vendor_oib'] = oibs[0] if len(oibs) > 1: out['customer_oib'] = oibs[1] # Invoice number m = RE_INVOICE_NO.search(text or '') if m: out['invoice_no'] = m.group(1).strip() # Date for rx, order in [(RE_DATE_DOT,'dmy'), (RE_DATE_ISO,'ymd')]: m = rx.search(text or '') if m: g = m.groups() if order=='dmy': out['invoice_date'] = f"{g[2]}-{g[1].zfill(2)}-{g[0].zfill(2)}" else: out['invoice_date'] = f"{g[0]}-{g[1].zfill(2)}-{g[2].zfill(2)}" break # Amount matches = RE_AMOUNT.findall(text or '') amts = [parse_amount(m) for m in matches] amts = [a for a in amts if a and a > 0.01] if amts: out['amount_gross'] = max(amts) # usually total is the largest out['amounts_found'] = amts[:5] # IBAN m = RE_IBAN.search((text or '').replace(' ','')) if m: out['iban'] = m.group(1) # First non-empty line as vendor name guess if text: for line in text.split('\n')[:8]: ln = line.strip() if 5 < len(ln) < 80 and not RE_OIB.search(ln) and not any(c.isdigit() for c in ln[:3]): out['vendor_name'] = ln break return out def process(row): uid = row['id'] print(f"[OCR] uid={uid} klub={row['klub_id']} file={row['file_name']}") try: text, method = extract_text_from_file(row['file_path']) if len(text.strip()) < 20: fail(uid, f"OCR yielded {len(text.strip())} chars (method={method})") print(f"[OCR] uid={uid} FAIL — empty") return fields = extract_fields(text) fields['ocr_method'] = method upd = { 'ocr_status': 'done', 'ai_invoice_no': fields.get('invoice_no'), 'ai_invoice_date': fields.get('invoice_date'), 'ai_vendor_name': fields.get('vendor_name'), 'ai_vendor_oib': fields.get('vendor_oib'), 'ai_amount_gross': fields.get('amount_gross'), 'ai_extracted': json.dumps(fields, ensure_ascii=False, default=str), 'ocr_text': text[:50000] } # If ocr_text column doesn't exist, drop it try: update_done(uid, upd) except Exception as e: if 'ocr_text' in str(e): upd.pop('ocr_text', None) update_done(uid, upd) else: raise print(f"[OCR] uid={uid} OK · vendor={fields.get('vendor_name','?')[:30]} · amt={fields.get('amount_gross','?')} · oib={fields.get('vendor_oib','?')}") except Exception as e: traceback.print_exc() fail(uid, str(e)) def main(): print(f"[OCR worker] starting, poll every {POLL}s") idle = 0 while True: try: row = claim_one() if row: process(row); idle = 0 else: idle += 1 if idle % 10 == 0: print(f"[OCR] idle x{idle}") time.sleep(POLL) except KeyboardInterrupt: print('\n[OCR] shutdown'); break except Exception as e: print('[OCR] loop error:', e); traceback.print_exc(); time.sleep(POLL*2) if __name__ == '__main__': main()