Merge agent3-payments: SEPA + CSV import + match workflow

This commit is contained in:
Damir Radulić
2026-05-05 18:35:01 +02:00
2 changed files with 615 additions and 27 deletions
+412 -4
View File
@@ -16,21 +16,25 @@
# ═══════════════════════════════════════════════════════════════════
from __future__ import annotations
import csv
import hashlib
import io
import os
import re
import uuid
from datetime import date, datetime
from decimal import Decimal
from io import BytesIO
from decimal import Decimal, InvalidOperation
from io import BytesIO, StringIO
from pathlib import Path
from typing import Optional, List
from xml.etree import ElementTree as ET
from xml.sax.saxutils import escape as xml_escape
import psycopg2
import psycopg2.extras
from fastapi import APIRouter, HTTPException, Query, Body, UploadFile, File, Depends, Header, Form
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from fastapi.responses import StreamingResponse, Response
from pydantic import BaseModel, Field
# ── Upload destination (relative to web root /uploads/...) ──────────
UPLOAD_BASE = Path("/opt/pgz-sport/uploads")
@@ -1549,6 +1553,410 @@ def payments_list(klub_id: Optional[int] = None,
return {"count": len(rows), "rows": rows}
# ── GET /payments/sepa-export — must be before /payments/{id} ───────
def _parse_ids_param(ids: Optional[str]) -> List[int]:
if not ids:
return []
out: List[int] = []
for chunk in ids.split(","):
chunk = chunk.strip()
if not chunk:
continue
try:
out.append(int(chunk))
except ValueError:
raise HTTPException(400, f"Bad id in 'ids': {chunk!r}")
return out
@router.get("/payments/sepa-export")
def payments_sepa_export(
ids: Optional[str] = Query(None, description="CSV id list, e.g. 1,2,3"),
godina: Optional[int] = None,
klub_id: Optional[int] = None,
matched_status: Optional[str] = None,
payment_method: Optional[str] = None,
):
"""Generate pain.001.001.03 SEPA Credit Transfer XML — MOCK / placeholder.
Either pass ?ids=1,2,3 OR filter (godina/klub_id/matched_status/payment_method)."""
where = ["1=1"]
params: list = []
id_list = _parse_ids_param(ids)
if id_list:
where.append("p.id = ANY(%s)")
params.append(id_list)
else:
if godina:
where.append("EXTRACT(YEAR FROM p.payment_date)=%s"); params.append(godina)
if klub_id:
where.append("p.klub_id=%s"); params.append(klub_id)
if matched_status:
where.append("p.matched_status=%s"); params.append(matched_status)
if payment_method:
where.append("p.payment_method=%s"); params.append(payment_method)
rows = db_query(
"SELECT p.id, p.klub_id, k.naziv AS klub_naziv, p.payment_date, p.amount, "
"p.currency, p.iban_from, p.iban_to, p.reference, p.description "
"FROM pgz_sport.payments p "
"LEFT JOIN pgz_sport.klubovi k ON k.id=p.klub_id "
f"WHERE {' AND '.join(where)} ORDER BY p.payment_date, p.id LIMIT 5000",
tuple(params))
debtor_name = os.environ.get("PGZ_SEPA_DEBTOR_NAME", "PGŽ Sportski savez")
debtor_iban = os.environ.get("PGZ_SEPA_DEBTOR_IBAN", "HR0000000000000000000")
debtor_bic = os.environ.get("PGZ_SEPA_DEBTOR_BIC", "NOTPROVIDED")
now = datetime.now()
msg_id = f"PGZ-{now.strftime('%Y%m%d%H%M%S')}-{uuid.uuid4().hex[:8].upper()}"
pmt_inf_id = f"{msg_id}-PMT01"
creation_dt = now.strftime("%Y-%m-%dT%H:%M:%S")
nb_of_txs = len(rows)
ctrl_sum = sum((Decimal(str(r["amount"])) for r in rows if r["amount"] is not None), Decimal("0"))
ctrl_sum_str = f"{ctrl_sum:.2f}"
requested_exec_date = (rows[0]["payment_date"] if rows and rows[0]["payment_date"] else now.date()).isoformat()
parts: List[str] = []
parts.append('<?xml version="1.0" encoding="UTF-8"?>')
parts.append('<!-- MOCK / placeholder — for future banking integration -->')
parts.append('<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pain.001.001.03" '
'xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">')
parts.append(' <CstmrCdtTrfInitn>')
# Group Header
parts.append(' <GrpHdr>')
parts.append(f' <MsgId>{xml_escape(msg_id)}</MsgId>')
parts.append(f' <CreDtTm>{creation_dt}</CreDtTm>')
parts.append(f' <NbOfTxs>{nb_of_txs}</NbOfTxs>')
parts.append(f' <CtrlSum>{ctrl_sum_str}</CtrlSum>')
parts.append(' <InitgPty>')
parts.append(f' <Nm>{xml_escape(debtor_name)}</Nm>')
parts.append(' </InitgPty>')
parts.append(' </GrpHdr>')
# Payment Information
parts.append(' <PmtInf>')
parts.append(f' <PmtInfId>{xml_escape(pmt_inf_id)}</PmtInfId>')
parts.append(' <PmtMtd>TRF</PmtMtd>')
parts.append(f' <NbOfTxs>{nb_of_txs}</NbOfTxs>')
parts.append(f' <CtrlSum>{ctrl_sum_str}</CtrlSum>')
parts.append(' <PmtTpInf><SvcLvl><Cd>SEPA</Cd></SvcLvl></PmtTpInf>')
parts.append(f' <ReqdExctnDt>{requested_exec_date}</ReqdExctnDt>')
parts.append(' <Dbtr>')
parts.append(f' <Nm>{xml_escape(debtor_name)}</Nm>')
parts.append(' </Dbtr>')
parts.append(' <DbtrAcct>')
parts.append(f' <Id><IBAN>{xml_escape(debtor_iban)}</IBAN></Id>')
parts.append(' </DbtrAcct>')
parts.append(' <DbtrAgt>')
parts.append(f' <FinInstnId><BIC>{xml_escape(debtor_bic)}</BIC></FinInstnId>')
parts.append(' </DbtrAgt>')
parts.append(' <ChrgBr>SLEV</ChrgBr>')
for r in rows:
amt = r["amount"] if r["amount"] is not None else Decimal("0")
try:
amt_str = f"{Decimal(str(amt)):.2f}"
except (InvalidOperation, TypeError):
amt_str = "0.00"
ccy = (r.get("currency") or "EUR")[:3]
cdtr_name = r.get("klub_naziv") or (f"Klub #{r['klub_id']}" if r.get("klub_id") else "Beneficiary")
cdtr_iban = (r.get("iban_to") or "").strip() or "HR0000000000000000000"
end_to_end = (r.get("reference") or f"PGZ-PAY-{r['id']}")[:35]
rmt = (r.get("description") or r.get("reference") or f"Payment #{r['id']}")[:140]
parts.append(' <CdtTrfTxInf>')
parts.append(' <PmtId>')
parts.append(f' <EndToEndId>{xml_escape(end_to_end)}</EndToEndId>')
parts.append(' </PmtId>')
parts.append(' <Amt>')
parts.append(f' <InstdAmt Ccy="{xml_escape(ccy)}">{amt_str}</InstdAmt>')
parts.append(' </Amt>')
parts.append(' <Cdtr>')
parts.append(f' <Nm>{xml_escape(cdtr_name)}</Nm>')
parts.append(' </Cdtr>')
parts.append(' <CdtrAcct>')
parts.append(f' <Id><IBAN>{xml_escape(cdtr_iban)}</IBAN></Id>')
parts.append(' </CdtrAcct>')
parts.append(' <RmtInf>')
parts.append(f' <Ustrd>{xml_escape(rmt)}</Ustrd>')
parts.append(' </RmtInf>')
parts.append(' </CdtTrfTxInf>')
parts.append(' </PmtInf>')
parts.append(' </CstmrCdtTrfInitn>')
parts.append('</Document>')
xml_body = "\n".join(parts).encode("utf-8")
fname = f"sepa_export_{now.strftime('%Y%m%d_%H%M%S')}.xml"
return Response(
content=xml_body,
media_type="application/xml",
headers={"Content-Disposition": f'attachment; filename="{fname}"'},
)
# ── GET /payments/{id} — single record + linked info ────────────────
@router.get("/payments/{rid}")
def payments_get(rid: int):
row = db_one(
"SELECT p.id, p.klub_id, k.naziv AS klub_naziv, p.invoice_id, "
"p.expense_report_id, p.clanarina_id, p.payment_date, p.amount, p.currency, "
"p.payment_method, p.iban_from, p.iban_to, p.reference, p.description, "
"p.bank_statement_no, p.bank_transaction_id, p.matched_status, "
"p.matched_by, p.matched_at, p.created_at, "
"i.invoice_no, i.vendor_name, i.amount_gross AS invoice_amount, "
"er.report_no, er.purpose AS expense_purpose, er.cost_total AS expense_total "
"FROM pgz_sport.payments p "
"LEFT JOIN pgz_sport.klubovi k ON k.id=p.klub_id "
"LEFT JOIN pgz_sport.invoices i ON i.id=p.invoice_id "
"LEFT JOIN pgz_sport.expense_reports er ON er.id=p.expense_report_id "
"WHERE p.id=%s",
(rid,))
if not row:
raise HTTPException(404, f"Payment id={rid} not found")
# Try clanarina (table may exist optionally)
if row.get("clanarina_id"):
try:
cl = db_one(
"SELECT id, godina, iznos, status FROM pgz_sport.clanarine WHERE id=%s",
(row["clanarina_id"],))
if cl:
row["clanarina"] = cl
except Exception:
row["clanarina"] = None
return row
# ── POST /payments — create ─────────────────────────────────────────
class PaymentIn(BaseModel):
klub_id: Optional[int] = None
invoice_id: Optional[int] = None
expense_report_id: Optional[int] = None
clanarina_id: Optional[int] = None
payment_date: date
amount: Decimal
currency: str = "EUR"
payment_method: Optional[str] = None
iban_from: Optional[str] = None
iban_to: Optional[str] = None
reference: Optional[str] = None
description: Optional[str] = None
bank_statement_no: Optional[str] = None
bank_transaction_id: Optional[str] = None
_ALLOWED_METHODS = {"iban", "cash", "card", "sepa", "transfer"}
@router.post("/payments")
def payments_create(body: PaymentIn):
if body.amount is None or Decimal(str(body.amount)) <= 0:
raise HTTPException(400, "amount must be > 0")
if body.payment_method and body.payment_method not in _ALLOWED_METHODS:
raise HTTPException(400, f"payment_method must be one of {sorted(_ALLOWED_METHODS)}")
new_id = db_exec(
"INSERT INTO pgz_sport.payments "
"(klub_id, invoice_id, expense_report_id, clanarina_id, payment_date, amount, "
" currency, payment_method, iban_from, iban_to, reference, description, "
" bank_statement_no, bank_transaction_id, matched_status, created_at) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'unmatched',now()) "
"RETURNING id",
(body.klub_id, body.invoice_id, body.expense_report_id, body.clanarina_id,
body.payment_date, body.amount, body.currency or "EUR",
body.payment_method, body.iban_from, body.iban_to,
body.reference, body.description,
body.bank_statement_no, body.bank_transaction_id),
returning=True,
)
return payments_get(new_id)
# ── PATCH /payments/{id} — partial update + manual matching ─────────
class PaymentPatch(BaseModel):
klub_id: Optional[int] = None
invoice_id: Optional[int] = None
expense_report_id: Optional[int] = None
clanarina_id: Optional[int] = None
payment_date: Optional[date] = None
amount: Optional[Decimal] = None
currency: Optional[str] = None
payment_method: Optional[str] = None
iban_from: Optional[str] = None
iban_to: Optional[str] = None
reference: Optional[str] = None
description: Optional[str] = None
bank_statement_no: Optional[str] = None
bank_transaction_id: Optional[str] = None
matched_status: Optional[str] = None
matched_by: Optional[str] = None
@router.patch("/payments/{rid}")
def payments_patch(rid: int, body: PaymentPatch):
existing = db_one("SELECT id FROM pgz_sport.payments WHERE id=%s", (rid,))
if not existing:
raise HTTPException(404, f"Payment id={rid} not found")
data = body.model_dump(exclude_unset=True) if hasattr(body, "model_dump") else body.dict(exclude_unset=True)
if "amount" in data and data["amount"] is not None and Decimal(str(data["amount"])) <= 0:
raise HTTPException(400, "amount must be > 0")
if "payment_method" in data and data["payment_method"] and data["payment_method"] not in _ALLOWED_METHODS:
raise HTTPException(400, f"payment_method must be one of {sorted(_ALLOWED_METHODS)}")
if not data:
return payments_get(rid)
set_parts: List[str] = []
params: list = []
for col, val in data.items():
set_parts.append(f"{col}=%s")
params.append(val)
# Manual matching: when matched_status flips to 'matched', stamp matched_at
if data.get("matched_status") == "matched":
set_parts.append("matched_at=now()")
params.append(rid)
db_exec(
f"UPDATE pgz_sport.payments SET {', '.join(set_parts)} WHERE id=%s",
tuple(params),
)
return payments_get(rid)
# ── POST /payments/import-csv — batch import ────────────────────────
_CSV_REQUIRED = ["payment_date", "amount", "currency", "payment_method",
"iban_from", "iban_to", "reference", "description"]
_CSV_OPTIONAL = ["klub_id", "invoice_id", "expense_report_id",
"clanarina_id", "bank_statement_no", "bank_transaction_id"]
def _parse_csv_date(s: str) -> Optional[date]:
if not s or not s.strip():
return None
s = s.strip()
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%d.%m.%Y.", "%d/%m/%Y", "%Y/%m/%d"):
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def _parse_csv_decimal(s: str) -> Optional[Decimal]:
if s is None or not str(s).strip():
return None
raw = str(s).strip().replace(" ", "")
# Croatian-style: 1.234,56 → 1234.56
if "," in raw and "." in raw:
raw = raw.replace(".", "").replace(",", ".")
elif "," in raw:
raw = raw.replace(",", ".")
try:
return Decimal(raw)
except (InvalidOperation, ValueError):
return None
def _parse_csv_int(s) -> Optional[int]:
if s is None or not str(s).strip():
return None
try:
return int(str(s).strip())
except ValueError:
return None
@router.post("/payments/import-csv")
async def payments_import_csv(file: UploadFile = File(...)):
"""Accepts CSV (UTF-8, header row). Required columns: payment_date,amount,currency,
payment_method,iban_from,iban_to,reference,description.
Optional: klub_id,invoice_id,expense_report_id,clanarina_id,bank_statement_no,
bank_transaction_id. All rows imported in a single transaction; bad rows are
skipped with an error entry."""
raw = await file.read()
if not raw:
raise HTTPException(400, "Empty file")
if len(raw) > 10 * 1024 * 1024:
raise HTTPException(413, "File > 10 MB")
text: Optional[str] = None
for enc in ("utf-8-sig", "utf-8", "cp1250", "iso-8859-2", "latin-1"):
try:
text = raw.decode(enc)
break
except UnicodeDecodeError:
continue
if text is None:
raise HTTPException(400, "Cannot decode CSV (try UTF-8)")
# Auto-detect delimiter (, vs ;)
sample = text[:4096]
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",;|\t")
except csv.Error:
dialect = csv.excel
reader = csv.DictReader(StringIO(text), dialect=dialect)
if not reader.fieldnames:
raise HTTPException(400, "CSV has no header row")
missing = [c for c in _CSV_REQUIRED if c not in reader.fieldnames]
if missing:
raise HTTPException(400, f"CSV missing columns: {missing}")
errors: List[dict] = []
valid_rows: List[tuple] = []
for idx, row in enumerate(reader, start=2): # header is row 1
pd = _parse_csv_date(row.get("payment_date", ""))
if pd is None:
errors.append({"row": idx, "msg": f"bad payment_date: {row.get('payment_date')!r}"})
continue
amt = _parse_csv_decimal(row.get("amount", ""))
if amt is None or amt <= 0:
errors.append({"row": idx, "msg": f"bad amount: {row.get('amount')!r}"})
continue
method = (row.get("payment_method") or "").strip() or None
if method and method not in _ALLOWED_METHODS:
errors.append({"row": idx, "msg": f"bad payment_method: {method!r}"})
continue
valid_rows.append((
_parse_csv_int(row.get("klub_id")),
_parse_csv_int(row.get("invoice_id")),
_parse_csv_int(row.get("expense_report_id")),
_parse_csv_int(row.get("clanarina_id")),
pd,
amt,
(row.get("currency") or "EUR").strip() or "EUR",
method,
(row.get("iban_from") or "").strip() or None,
(row.get("iban_to") or "").strip() or None,
(row.get("reference") or "").strip() or None,
(row.get("description") or "").strip() or None,
(row.get("bank_statement_no") or "").strip() or None,
(row.get("bank_transaction_id") or "").strip() or None,
))
inserted = 0
if valid_rows:
def _do(cur):
nonlocal inserted
for tup in valid_rows:
cur.execute(
"INSERT INTO pgz_sport.payments "
"(klub_id, invoice_id, expense_report_id, clanarina_id, payment_date, "
" amount, currency, payment_method, iban_from, iban_to, reference, "
" description, bank_statement_no, bank_transaction_id, matched_status, "
" created_at) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'unmatched',now())",
tup,
)
inserted += 1
return inserted
db_tx(_do)
return {"ok": True, "inserted": inserted, "errors": errors,
"total_rows": inserted + len(errors)}
# ═══════════════════════════════════════════════════════════════════
# 14) HEALTH/DEBUG
# ═══════════════════════════════════════════════════════════════════