CC2 R4 #6: real TOTP 2FA (setup + verify + disable + login flow)

- auth/auth_v2.py:
  - pyotp-based TOTP (RFC 6238, base32 secret, ±30s window)
  - new pgz_sport.user_2fa table (auto-created)
  - QR code embedded as data: URL via qrcode lib
  - 8 single-use recovery codes generated at setup
  - /2fa/setup, /2fa/verify, /2fa/disable, /2fa/status endpoints
  - Login flow: when 2FA enabled, requires totp field; recovery codes
    accepted and consumed on use
- static/login.html: TOTP field appears when login returns 2FA_REQUIRED
- static/admin_users.html: full 2FA panel in Sigurnost tab
  (status badge, QR + secret + recovery code display, verify input)

Live tests pass:
  T1 status (no setup) → enabled:false
  T2 setup → secret + 1.5KB QR PNG + 8 recovery codes
  T3 verify wrong code → 401
  T4 verify real TOTP → enabled:true
  T5 login w/o TOTP after enable → 401 detail=2FA_REQUIRED
  T6 login w/ TOTP → 200
This commit is contained in:
Damir Radulić
2026-05-05 00:50:28 +02:00
parent a0db65fc31
commit bd3773434e
10 changed files with 4594 additions and 225 deletions
+140 -13
View File
@@ -276,6 +276,7 @@ def _client(req: Request):
class LoginReq(BaseModel):
email: str
password: str
totp: Optional[str] = None # 6-digit TOTP if 2FA enabled (or recovery code)
class RefreshReq(BaseModel):
refresh_token: str
@@ -327,6 +328,32 @@ def login(req: LoginReq, request: Request):
(hash_password(req.password), u["id"]))
except Exception: pass
# 2FA gate — if user has enabled 2FA, demand a valid TOTP / recovery code
twofa_row = None
try:
twofa_row = db_one("SELECT secret, enabled, recovery_codes FROM pgz_sport.user_2fa WHERE user_id=%s",
(u["id"],))
except Exception: pass
if twofa_row and twofa_row.get("enabled"):
code = (req.totp or "").strip().replace(" ", "")
if not code:
audit(u["id"], "login.2fa_required", ip=ip, ua=ua)
raise HTTPException(401, "2FA_REQUIRED")
ok = False
if code.isdigit() and len(code) in (6, 8) and HAS_PYOTP:
ok = _pyotp.TOTP(twofa_row["secret"]).verify(code, valid_window=1)
if not ok and twofa_row.get("recovery_codes"):
up = code.upper()
if up in (twofa_row["recovery_codes"] or []):
ok = True
# consume the recovery code so it can't be reused
remaining = [c for c in twofa_row["recovery_codes"] if c != up]
db_exec("UPDATE pgz_sport.user_2fa SET recovery_codes=%s, updated_at=now() WHERE user_id=%s",
(remaining, u["id"]))
if not ok:
audit(u["id"], "login.2fa_fail", ip=ip, ua=ua)
raise HTTPException(401, "Neispravan 2FA kod")
db_exec("""UPDATE pgz_sport.users
SET failed_login_count=0, locked_until=NULL, last_login=now()
WHERE id=%s""", (u["id"],))
@@ -520,20 +547,120 @@ def password_reset(req: ResetPwdReq, request: Request):
return {"status": "ok",
"message": "Ako račun postoji, administrator će vam poslati instrukcije."}
# ─────────────────────────── 2FA placeholders (TOTP) ───────────────────────────
# ─────────────────────────── 2FA — real TOTP (RFC 6238) ───────────────────────────
try:
import pyotp as _pyotp
HAS_PYOTP = True
except Exception:
HAS_PYOTP = False
def _ensure_2fa_table():
db_exec("""CREATE TABLE IF NOT EXISTS pgz_sport.user_2fa (
user_id INTEGER PRIMARY KEY REFERENCES pgz_sport.users(id) ON DELETE CASCADE,
secret TEXT NOT NULL,
enabled BOOLEAN DEFAULT false,
verified_at TIMESTAMPTZ,
recovery_codes TEXT[],
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
)""")
_ensure_2fa_table()
def _build_qr_png(otpauth_url: str) -> str:
"""Return a data: URL containing a base64 PNG of the QR code."""
try:
import qrcode, io, base64
img = qrcode.make(otpauth_url)
buf = io.BytesIO()
img.save(buf, format="PNG")
return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode()
except Exception as e:
return ""
def _gen_recovery_codes(n: int = 8) -> List[str]:
return [secrets.token_hex(4).upper() for _ in range(n)]
@router.post("/2fa/setup")
def twofa_setup(user = Depends(require_user)):
"""Stub — generate TOTP secret + return otpauth URL.
Full TOTP verification will be added in M1.5."""
secret = secrets.token_hex(20).upper()
db_exec("""ALTER TABLE pgz_sport.users
ADD COLUMN IF NOT EXISTS two_factor_secret text,
ADD COLUMN IF NOT EXISTS two_factor_enabled boolean DEFAULT false""")
db_exec("UPDATE pgz_sport.users SET two_factor_secret=%s WHERE id=%s",
(secret, user["id"]))
otpauth = f"otpauth://totp/PGŽ%20Sport:{user['email']}?secret={secret}&issuer=PGZSport"
return {"secret": secret, "otpauth": otpauth, "enabled": False}
"""Generate a TOTP secret, store unverified, and return otpauth URL + QR + recovery codes.
The 2FA stays disabled until /2fa/verify confirms a valid TOTP code."""
if not HAS_PYOTP:
raise HTTPException(503, "pyotp not installed on server")
secret = _pyotp.random_base32() # 32-char base32, RFC 4648 — what authenticator apps expect
recovery = _gen_recovery_codes()
db_exec("""INSERT INTO pgz_sport.user_2fa (user_id, secret, enabled, recovery_codes, updated_at)
VALUES (%s,%s,false,%s,now())
ON CONFLICT (user_id) DO UPDATE SET
secret=EXCLUDED.secret, enabled=false,
recovery_codes=EXCLUDED.recovery_codes, updated_at=now()""",
(user["id"], secret, recovery))
issuer = "PGŽ Sport"
otpauth = _pyotp.TOTP(secret).provisioning_uri(name=user["email"], issuer_name=issuer)
return {
"secret": secret,
"otpauth_url": otpauth,
"qr_png": _build_qr_png(otpauth),
"issuer": issuer,
"account": user["email"],
"recovery_codes": recovery,
"enabled": False,
"instructions": "Skenirajte QR u Google Authenticator / Authy / 1Password, zatim potvrdite kod kroz POST /api/auth/2fa/verify",
}
class TwoFAVerifyReq(BaseModel):
code: str
@router.post("/2fa/verify")
def twofa_verify(code: str = Body(..., embed=True), user = Depends(require_user)):
return {"status": "stub", "verified": False, "code_received": bool(code)}
def twofa_verify(req: TwoFAVerifyReq, request: Request, user = Depends(require_user)):
"""Verify TOTP code; on success, mark 2FA enabled."""
if not HAS_PYOTP:
raise HTTPException(503, "pyotp not installed on server")
row = db_one("SELECT secret, enabled FROM pgz_sport.user_2fa WHERE user_id=%s",
(user["id"],))
if not row:
raise HTTPException(400, "2FA nije postavljen — pozovite /2fa/setup prvo")
code = (req.code or "").strip().replace(" ", "")
if not code or not code.isdigit() or len(code) not in (6, 8):
raise HTTPException(400, "Neispravan format koda (6-8 znamenki)")
totp = _pyotp.TOTP(row["secret"])
# valid_window=1 → tolerate ±30s drift
if not totp.verify(code, valid_window=1):
ip, ua = _client(request)
audit(user["id"], "2fa.verify.fail", ip=ip, ua=ua)
raise HTTPException(401, "Neispravan TOTP kod")
db_exec("""UPDATE pgz_sport.user_2fa
SET enabled=true, verified_at=now(), updated_at=now()
WHERE user_id=%s""", (user["id"],))
ip, ua = _client(request)
audit(user["id"], "2fa.verify.ok", ip=ip, ua=ua)
return {"status": "ok", "enabled": True}
@router.post("/2fa/disable")
def twofa_disable(req: TwoFAVerifyReq, request: Request, user = Depends(require_user)):
"""Disable 2FA — must verify a current TOTP code (or recovery code)."""
if not HAS_PYOTP:
raise HTTPException(503, "pyotp not installed on server")
row = db_one("SELECT secret, recovery_codes FROM pgz_sport.user_2fa WHERE user_id=%s",
(user["id"],))
if not row:
raise HTTPException(404, "2FA nije postavljen")
code = (req.code or "").strip().replace(" ", "").upper()
valid = False
if code.isdigit() and len(code) in (6, 8):
valid = _pyotp.TOTP(row["secret"]).verify(code, valid_window=1)
elif row.get("recovery_codes") and code in (row["recovery_codes"] or []):
valid = True
if not valid:
raise HTTPException(401, "Neispravan kod")
db_exec("DELETE FROM pgz_sport.user_2fa WHERE user_id=%s", (user["id"],))
ip, ua = _client(request)
audit(user["id"], "2fa.disable", ip=ip, ua=ua)
return {"status": "ok", "enabled": False}
@router.get("/2fa/status")
def twofa_status(user = Depends(require_user)):
row = db_one("SELECT enabled, verified_at, created_at FROM pgz_sport.user_2fa WHERE user_id=%s",
(user["id"],))
return {"enabled": bool(row and row.get("enabled")),
"configured": bool(row),
"verified_at": row.get("verified_at") if row else None}