Invoice POC — ETL Pipeline Architecture
Current-state architecture of the invoice-poc ETL pipeline after the TECH-569 fixes. Shows the full data flow from file selection through extraction, AI-powered transformation, and master data output.
System Overview
graph TD
subgraph Frontend["Frontend (React + Vite)"]
ETL["ETLPipeline.tsx<br/>Phase Orchestrator"]
EP["ExtractPhase"]
TP["TransformPhase"]
MDV["MasterDataView"]
ETL --> EP
ETL --> TP
ETL --> MDV
end
subgraph Backend["Backend (Express)"]
PR["parse.routes.ts<br/>All ETL endpoints"]
ST["store.ts<br/>KV Store"]
LG["logger.ts<br/>VictoriaLogs"]
end
subgraph External["External Services"]
DOC["docling-server.py<br/>ML Table Extraction"]
OR["OpenRouter API<br/>Gemini 3 Flash"]
VL["VictoriaLogs"]
end
EP -->|"POST /docling"| PR
TP -->|"POST /transform"| PR
PR -->|"POST /convert"| DOC
PR -->|"LLM prompt"| OR
PR --> ST
PR --> LG
LG -->|fire-and-forget| VL
style Frontend fill:#e1f5fe
style Backend fill:#e8f5e9
style External fill:#fff3e0Phase Progression
The ETLPipeline orchestrator manages a strict phase gate — each phase must complete before the next unlocks.
stateDiagram-v2
[*] --> Extract
Extract --> Transform: ExtractOutput\n(tables + courier + file)
Transform --> MasterData: TransformOutput\n(lineItems + mappings)
MasterData --> [*]
state Extract {
[*] --> FileSelect
FileSelect --> PageCount: PDF selected
FileSelect --> RunDocling: Non-PDF selected
PageCount --> RunDocling: Auto-run
RunDocling --> TableSelect: Tables extracted
TableSelect --> Done: Tables selected
}
state Transform {
[*] --> TryCacheHit
TryCacheHit --> ApplyCache: Valid mapper found
TryCacheHit --> CallLLM: No cache hit
CallLLM --> ValidateLLM: JSON parsed
ValidateLLM --> ApplyRules: Structure valid
ApplyRules --> CacheMapper: Line items produced
ApplyCache --> Done2: Line items returned
CacheMapper --> Done2
}
state MasterData {
[*] --> Display: Filter, sort, search
}Detailed Endpoint Chain
sequenceDiagram
participant U as User
participant FE as Frontend
participant BE as Express Server
participant DL as Docling (Python)
participant KV as KV Store
participant LLM as OpenRouter/Gemini
Note over U,LLM: Phase 1 — Extract
U->>FE: Select invoice file
FE->>BE: GET /api/parse/files
BE-->>FE: { files: [{ courier, file, path, size }] }
FE->>BE: GET /api/parse/docling/page-count?file=...
BE->>DL: GET /page-count?file=... (pypdfium2, no ML)
DL-->>BE: { pages: 450 }
BE-->>FE: { pages: 450 }
FE->>BE: POST /api/parse/docling { file }
Note right of FE: "Extract All" mode:<br/>no startPage/endPage sent
BE->>DL: POST /convert { file }
Note right of DL: ML table detection<br/>~2min for 450 pages
DL-->>BE: { markdown, tables[], totalPages, processingMs }
BE-->>FE: Same + logged to VictoriaLogs
Note over U,LLM: Phase 2 — Transform
U->>FE: Click "Map to Schema"
FE->>BE: POST /api/parse/transform { tables, courier, fileName }
BE->>KV: list('mappers/evri/')
KV-->>BE: [mapper1, mapper2, ...]
Note right of BE: Filter by schemaVersion hash<br/>Sort: exact FP first, then by usage
alt Cache Hit
BE->>BE: validateMappingResult(tables, mapper.rules)
BE-->>FE: { lineItems, model: "cached (gemini-3-flash)", processingMs: 12 }
else Cache Miss — LLM Call
BE->>LLM: POST /chat/completions<br/>{ headers + 8 stratified sample rows }
LLM-->>BE: { tableMappings, unmappedTables, currency }
BE->>BE: validateLLMResponse() → structure check
BE->>BE: applyMappings() → all rows deterministically
BE->>KV: set('mappers/evri/{fp}', mapper + schemaVersion)
BE-->>FE: { lineItems, model: "gemini-3-flash", processingMs: 4200 }
end
Note over U,LLM: Phase 3 — Master Data View
FE->>U: Sortable/filterable table of normalized LineItemsExample 1 — Happy Path: First-Time Evri Invoice (450 pages)
A user processes a new Evri invoice format that has never been seen before.
graph TD
A["User selects:<br/><b>evri/march-2026.pdf</b><br/>4.2MB, 450 pages"] --> B
B["GET /docling/page-count<br/>→ 450 pages<br/>(pypdfium2, instant)"] --> C
C["POST /docling<br/>mode: <b>Extract All</b><br/>no startPage/endPage<br/>⏱ ~120s for 450 pages"] --> D
D["Result: 47 tables extracted<br/>12,400 total rows<br/>Labels: 42 structured + 5 key_value_area"] --> E
E["User selects 42 structured tables<br/>(deselects 5 key_value headers)"] --> F
F["POST /transform<br/>fingerprint: <b>a3f8c1e2...</b><br/>courier: evri"] --> G
G{"Cached mappers<br/>for 'evri'?"}
G -->|"0 mappers<br/>(first time)"| H
H["Build LLM prompt<br/>Headers + 8 stratified rows per table<br/>~4,200 chars prompt"] --> I
I["OpenRouter → Gemini 3 Flash<br/>temp: 0.1, max_tokens: 8000<br/>⏱ ~3.5s"] --> J
J["validateLLMResponse()<br/>✓ tableMappings is array<br/>✓ each has tableIndex + columnMappings<br/>✓ optional arrays normalized"] --> K
K["applyMappings() to all 12,400 rows<br/>→ 11,850 line items<br/>(550 skipped: totals, headers, metadata)"] --> L
L["Per-item confidence scored<br/>avg 87%, 12 items below 80%<br/>Key field penalties applied"] --> M
M["Cache mapper:<br/>mappers/evri/a3f8c1e2<br/>schemaVersion: b7e2f1a3c9d0"] --> N
N["Response to frontend<br/>11,850 items, £142,380.50<br/>model: gemini-3-flash<br/>processingMs: 4,200"]
style A fill:#e1f5fe
style G fill:#fff9c4
style H fill:#fff3e0
style I fill:#fff3e0
style N fill:#e8f5e9Example 2 — Cache Hit: Same Evri Format, Different Month
The same user processes April's Evri invoice. Same table structure, different data.
graph TD
A["User selects:<br/><b>evri/april-2026.pdf</b><br/>Same format as March"] --> B
B["Extract All → 45 tables<br/>11,200 rows"] --> C
C["POST /transform<br/>fingerprint: <b>a3f8c1e2...</b><br/>(same headers = same FP)"] --> D
D{"Load cached mappers<br/>for 'evri'"}
D --> E["Found 1 mapper:<br/>a3f8c1e2 (EXACT match)<br/>schemaVersion: b7e2f1a3c9d0 ✓<br/>usageCount: 1"]
E --> F["validateMappingResult()<br/>✓ 10,800 items produced<br/>✓ 96% of rows yielded items<br/>✓ 94% have descriptions<br/>✓ non-zero amounts present"]
F --> G["CACHE HIT<br/>Skip LLM entirely<br/>⏱ 12ms apply time"]
G --> H["Bump usage:<br/>usageCount: 2<br/>lastUsedAt: now()"]
H --> I["Response: 10,800 items<br/>£128,450.20<br/>model: cached (gemini-3-flash)<br/>processingMs: 12"]
style A fill:#e1f5fe
style E fill:#fff9c4
style G fill:#e8f5e9
style I fill:#e8f5e9Cache hits return in milliseconds vs seconds for LLM calls. The fingerprint is a SHA-256 of sorted headers per table — same column structure = same fingerprint regardless of row data.
Example 3 — Schema Change Invalidates Cache
A developer updates MASTER_SCHEMA_PROMPT to add a new field. All cached mappers are automatically invalidated.
graph TD
A["Developer adds field to<br/>MASTER_SCHEMA_PROMPT:<br/><code>fuelSurchargeRate?: number</code>"] --> B
B["Server restart<br/>SCHEMA_VERSION rehashed:<br/>b7e2f1a3c9d0 → <b>d4a1c8f2e7b3</b>"] --> C
C["User processes same Evri invoice"] --> D
D["loadCachedMappers('evri')<br/>Found 1 mapper on disk"] --> E
E{"mapper.schemaVersion<br/>=== SCHEMA_VERSION?"}
E -->|"b7e2f1a3c9d0 ≠ d4a1c8f2e7b3<br/>MISMATCH"| F
F["Mapper skipped<br/>Log: 'schema version mismatch'<br/>0 valid mappers remaining"] --> G
G["No cache hit → fresh LLM call<br/>New prompt includes fuelSurchargeRate"] --> H
H["New mapper cached with<br/>schemaVersion: d4a1c8f2e7b3"] --> I
I["Future requests use new mapper<br/>with updated schema"]
style A fill:#f3e5f5
style E fill:#fff9c4
style F fill:#fce4ec
style G fill:#fff3e0
style I fill:#e8f5e9Mappers without a
schemaVersionfield (pre-Fix-2 legacy) are also rejected:undefined !== hash. No manual cache clearing needed.
Example 4 — Concurrent Duplicate Requests
Two browser tabs process the same DPD invoice format simultaneously.
sequenceDiagram
participant T1 as Tab 1
participant T2 as Tab 2
participant BE as Backend
participant LLM as OpenRouter
T1->>BE: POST /transform { tables, courier: "dpd" }
Note right of BE: lockKey = "dpd:f7a2b8c1"<br/>inFlightLLM.has()? No<br/>Create promise, store in map
T2->>BE: POST /transform { tables, courier: "dpd" }
Note right of BE: lockKey = "dpd:f7a2b8c1"<br/>inFlightLLM.has()? YES<br/>Await existing promise
BE->>LLM: Single LLM call (Tab 1's promise)
LLM-->>BE: { tableMappings, ... }
Note right of BE: Both requests resolve<br/>from same LLM result
BE-->>T1: { runId: "abc1", lineItems, processingMs: 3800 }
BE-->>T2: { runId: "def2", lineItems, processingMs: 3850 }
Note over T1,T2: Same mapping rules applied<br/>Different runIds + timing<br/>Only 1 LLM call billedThe
inFlightLLMmap uses{courier}:{fingerprint}as the lock key. Thefinallyblock ensures cleanup even if the LLM call fails. Each request still gets its ownrunIdfor independent logging.
Example 5 — LLM Returns Invalid Response
The LLM returns malformed JSON or a structurally invalid response.
graph TD
A["POST /transform<br/>No cache hit → call LLM"] --> B
B["OpenRouter responds<br/>(3.2s)"] --> C
C{"JSON.parse<br/>succeeds?"}
C -->|"No — garbled output<br/>or markdown fences"| D
D["Throw: 'LLM returned invalid JSON'<br/>+ first 2000 chars of raw response"] --> ERR
C -->|"Yes — parsed object"| E
E{"validateLLMResponse()"}
E -->|"tableMappings missing<br/>or not an array"| F
F["Throw: 'tableMappings is not an array'"] --> ERR
E -->|"columnMapping missing<br/>sourceColumn or targetField"| G
G["Throw: 'tableMappings[0].columnMappings[2]<br/>.targetField is not a string'"] --> ERR
E -->|"All checks pass"| H
H["Normalize optional arrays<br/>classificationRules ??= []<br/>skipRules ??= []<br/>unmappedTables ??= []"] --> OK
ERR["catch block:<br/>500 { error: detail }<br/>logEvent('etl:transform:error')"]
OK["Continue to applyMappings()"]
style C fill:#fff9c4
style E fill:#fff9c4
style ERR fill:#fce4ec
style OK fill:#e8f5e9Example 6 — Subset Header Matching (Re-indexed Tables)
A cached mapper was built from tables with indices 0-5. A new extraction of the same format produces tables with indices 3-8 (different page range or table ordering).
graph TD
A["Cached mapper expects:<br/>tableIndex: 0<br/>columnMappings: [Weight, Price, Service]"] --> B
B["New extraction tables:<br/>Table 3: headers [Weight, Price, Service, <b>Notes</b>]<br/>Table 4: headers [Date, Ref, Amount]"] --> C
C{"Find table by index 0?"}
C -->|"Not found"| D
D{"Subset header match:<br/>Every mapped sourceColumn<br/>exists in table's headers?"}
D -->|"Table 3: headerSet has<br/>Weight ✓ Price ✓ Service ✓<br/>(extra 'Notes' column OK)"| E
D -->|"Table 4: headerSet missing<br/>Weight ✗ — skip"| F
E["Remap: tableIndex 0 → 3<br/>Apply mappings to Table 3<br/>Extra 'Notes' column ignored"]
F["Table 4 not matched<br/>→ skipped for this mapping"]
style C fill:#fff9c4
style D fill:#fff9c4
style E fill:#e8f5e9
style F fill:#fff3e0Fix 8 changed this from exact match (sorted headers must be identical) to subset match (every mapped column must exist in the table). Tables can have extra columns and still match.
Example 7 — Extract Mode Toggle (Debug vs Production)
graph LR
subgraph AllMode["Extract All (Default)"]
A1["POST /docling<br/>{ file: 'invoice.pdf' }<br/>No startPage/endPage"] --> A2["Docling processes<br/>entire document<br/>All 450 pages"]
A2 --> A3["⏱ ~120s<br/>Elapsed timer shows:<br/>'Extracting... 45s<br/>(large documents may<br/>take several minutes)'"]
end
subgraph WindowMode["Page Window (Debug)"]
B1["POST /docling<br/>{ file: 'invoice.pdf',<br/>startPage: 1, endPage: 5 }"] --> B2["Docling processes<br/>pages 1-5 only"]
B2 --> B3["⏱ ~3s<br/>← → navigation<br/>to slide window"]
end
style AllMode fill:#e8f5e9
style WindowMode fill:#e1f5feThe toggle sits in the ExtractPhase toolbar. "Extract All" is default for production use. "Page Window" preserves the old
PAGE_WINDOW=5navigation for debugging specific page ranges.
Per-Item Confidence Scoring
Fix 5 replaced the flat table-level confidence with a per-item multiplicative score.
graph TD
A["Start: mapping.confidence<br/>(table-level baseline from LLM)<br/>e.g. 0.92"] --> B
B["Factor 1: Field Population<br/>× (0.7 + 0.3 × populated/total)<br/><br/>8/10 fields populated:<br/>× (0.7 + 0.3 × 0.8) = × 0.94"] --> C
C["Factor 2: Key Field Penalties<br/>totalAmount === 0? × 0.6<br/>description empty? × 0.5<br/><br/>Both present: × 1.0"] --> D
D["Factor 3: Classification Defaults<br/>× (1 - 0.15 × defaulted/total)<br/><br/>1 of 3 rules fell to default:<br/>× (1 - 0.15 × 0.33) = × 0.95"] --> E
E["Final: clamp(0, 1)<br/>0.92 × 0.94 × 1.0 × 0.95<br/>= <b>0.821</b> (82%)"]
F["Edge case: zero amount + empty desc<br/>0.92 × 0.94 × 0.6 × 0.5 × 0.95<br/>= <b>0.246</b> (25%)<br/>→ flagged as low confidence"]
style A fill:#e1f5fe
style E fill:#e8f5e9
style F fill:#fce4ecStratified Sample Row Selection
Fix 4 ensures the LLM sees diverse data instead of just the first 8 rows.
graph TD
A["Input: 500 rows<br/>max = 8 samples"] --> B
B["Take first 4 rows<br/>(header-adjacent, cleanest)"] --> C
C["From remaining 496 rows:<br/>Pick rows with unique<br/>first-column values"] --> D
D{"Found 4+ diverse rows?"}
D -->|"Yes: e.g. different<br/>tracking prefixes<br/>AB12, CD34, EF56, GH78"| E
E["Return: first 4 + 4 diverse = 8"]
D -->|"No: only 2 unique values"| F
F["Fill remaining 2 slots with<br/>evenly-spaced rows:<br/>row 124, row 248"] --> G
G["Return: first 4 + 2 diverse + 2 spaced = 8"]
style A fill:#e1f5fe
style E fill:#e8f5e9
style G fill:#e8f5e9For small tables (≤8 rows), all rows are included — no sampling needed.
Data Model Summary
erDiagram
KV_STORE {
string key PK "mappers/{courier}/{fp}"
string value "JSON StoredMapper"
}
ETL_RUN {
string id PK "8-char UUID"
string timestamp
string courier
string fileName
boolean cacheHit
string mapperFingerprint
}
STORED_MAPPER {
string courier
string fingerprint PK
string schemaVersion "SHA-256 of prompt"
string model "gemini-3-flash"
number usageCount
string lastUsedAt
}
LLM_RESPONSE {
number tableIndex
string tableLabel
string purpose
number confidence
}
LINE_ITEM {
string description
number quantity
number totalAmount
string category "base_service|surcharge|..."
string serviceType
string trackingNumber
number confidence "per-item scored"
boolean isSurcharge
boolean isDiscount
}
KV_STORE ||--o{ STORED_MAPPER : stores
KV_STORE ||--o{ ETL_RUN : stores
STORED_MAPPER ||--|{ LLM_RESPONSE : contains
ETL_RUN ||--|{ LINE_ITEM : produces
LLM_RESPONSE ||--|{ LINE_ITEM : maps_toService Map
| Service | Port | Role |
|---|---|---|
| Frontend (Vite) | 3000 | React SPA — phase orchestration + data display |
| Backend (Express) | 3001 | ETL endpoints, LLM proxy, KV cache |
| Docling Server (Python) | 3002 | ML-based document structure extraction |
| OpenRouter API | external | LLM gateway → Gemini 3 Flash |
| VictoriaLogs | 9428 | Structured event logging (optional) |
| Azurite | 10000 | Azure Blob emulator for KV store (optional) |
Environment Config
| Variable | Default | Purpose |
|---|---|---|
OPENROUTER_API_KEY |
— | LLM API access (required for transform) |
DOCLING_URL |
http://localhost:3002 |
Docling server address |
STORE_BACKEND |
default (filesystem) |
azure for Blob Storage |
VICTORIALOGS_URL |
http://localhost:9428 |
Structured logging endpoint |
PORT |
3001 |
Backend server port |