By Patrick McCurley

Invoice POC — ETL Pipeline Architecture

By Patrick McCurley · Created Mar 13, 2026 public

Current-state architecture of the invoice-poc ETL pipeline after the TECH-569 fixes. Shows the full data flow from file selection through extraction, AI-powered transformation, and master data output.

System Overview

graph TD
    subgraph Frontend["Frontend (React + Vite)"]
        ETL["ETLPipeline.tsx<br/>Phase Orchestrator"]
        EP["ExtractPhase"]
        TP["TransformPhase"]
        MDV["MasterDataView"]
        ETL --> EP
        ETL --> TP
        ETL --> MDV
    end

    subgraph Backend["Backend (Express)"]
        PR["parse.routes.ts<br/>All ETL endpoints"]
        ST["store.ts<br/>KV Store"]
        LG["logger.ts<br/>VictoriaLogs"]
    end

    subgraph External["External Services"]
        DOC["docling-server.py<br/>ML Table Extraction"]
        OR["OpenRouter API<br/>Gemini 3 Flash"]
        VL["VictoriaLogs"]
    end

    EP -->|"POST /docling"| PR
    TP -->|"POST /transform"| PR
    PR -->|"POST /convert"| DOC
    PR -->|"LLM prompt"| OR
    PR --> ST
    PR --> LG
    LG -->|fire-and-forget| VL

    style Frontend fill:#e1f5fe
    style Backend fill:#e8f5e9
    style External fill:#fff3e0

Phase Progression

The ETLPipeline orchestrator manages a strict phase gate — each phase must complete before the next unlocks.

stateDiagram-v2
    [*] --> Extract
    Extract --> Transform: ExtractOutput\n(tables + courier + file)
    Transform --> MasterData: TransformOutput\n(lineItems + mappings)
    MasterData --> [*]

    state Extract {
        [*] --> FileSelect
        FileSelect --> PageCount: PDF selected
        FileSelect --> RunDocling: Non-PDF selected
        PageCount --> RunDocling: Auto-run
        RunDocling --> TableSelect: Tables extracted
        TableSelect --> Done: Tables selected
    }

    state Transform {
        [*] --> TryCacheHit
        TryCacheHit --> ApplyCache: Valid mapper found
        TryCacheHit --> CallLLM: No cache hit
        CallLLM --> ValidateLLM: JSON parsed
        ValidateLLM --> ApplyRules: Structure valid
        ApplyRules --> CacheMapper: Line items produced
        ApplyCache --> Done2: Line items returned
        CacheMapper --> Done2
    }

    state MasterData {
        [*] --> Display: Filter, sort, search
    }

Detailed Endpoint Chain

sequenceDiagram
    participant U as User
    participant FE as Frontend
    participant BE as Express Server
    participant DL as Docling (Python)
    participant KV as KV Store
    participant LLM as OpenRouter/Gemini

    Note over U,LLM: Phase 1 — Extract

    U->>FE: Select invoice file
    FE->>BE: GET /api/parse/files
    BE-->>FE: { files: [{ courier, file, path, size }] }

    FE->>BE: GET /api/parse/docling/page-count?file=...
    BE->>DL: GET /page-count?file=... (pypdfium2, no ML)
    DL-->>BE: { pages: 450 }
    BE-->>FE: { pages: 450 }

    FE->>BE: POST /api/parse/docling { file }
    Note right of FE: "Extract All" mode:<br/>no startPage/endPage sent
    BE->>DL: POST /convert { file }
    Note right of DL: ML table detection<br/>~2min for 450 pages
    DL-->>BE: { markdown, tables[], totalPages, processingMs }
    BE-->>FE: Same + logged to VictoriaLogs

    Note over U,LLM: Phase 2 — Transform

    U->>FE: Click "Map to Schema"
    FE->>BE: POST /api/parse/transform { tables, courier, fileName }

    BE->>KV: list('mappers/evri/')
    KV-->>BE: [mapper1, mapper2, ...]
    Note right of BE: Filter by schemaVersion hash<br/>Sort: exact FP first, then by usage

    alt Cache Hit
        BE->>BE: validateMappingResult(tables, mapper.rules)
        BE-->>FE: { lineItems, model: "cached (gemini-3-flash)", processingMs: 12 }
    else Cache Miss — LLM Call
        BE->>LLM: POST /chat/completions<br/>{ headers + 8 stratified sample rows }
        LLM-->>BE: { tableMappings, unmappedTables, currency }
        BE->>BE: validateLLMResponse() → structure check
        BE->>BE: applyMappings() → all rows deterministically
        BE->>KV: set('mappers/evri/{fp}', mapper + schemaVersion)
        BE-->>FE: { lineItems, model: "gemini-3-flash", processingMs: 4200 }
    end

    Note over U,LLM: Phase 3 — Master Data View
    FE->>U: Sortable/filterable table of normalized LineItems

Example 1 — Happy Path: First-Time Evri Invoice (450 pages)

A user processes a new Evri invoice format that has never been seen before.

graph TD
    A["User selects:<br/><b>evri/march-2026.pdf</b><br/>4.2MB, 450 pages"] --> B

    B["GET /docling/page-count<br/>→ 450 pages<br/>(pypdfium2, instant)"] --> C

    C["POST /docling<br/>mode: <b>Extract All</b><br/>no startPage/endPage<br/>⏱ ~120s for 450 pages"] --> D

    D["Result: 47 tables extracted<br/>12,400 total rows<br/>Labels: 42 structured + 5 key_value_area"] --> E

    E["User selects 42 structured tables<br/>(deselects 5 key_value headers)"] --> F

    F["POST /transform<br/>fingerprint: <b>a3f8c1e2...</b><br/>courier: evri"] --> G

    G{"Cached mappers<br/>for 'evri'?"}

    G -->|"0 mappers<br/>(first time)"| H

    H["Build LLM prompt<br/>Headers + 8 stratified rows per table<br/>~4,200 chars prompt"] --> I

    I["OpenRouter → Gemini 3 Flash<br/>temp: 0.1, max_tokens: 8000<br/>⏱ ~3.5s"] --> J

    J["validateLLMResponse()<br/>✓ tableMappings is array<br/>✓ each has tableIndex + columnMappings<br/>✓ optional arrays normalized"] --> K

    K["applyMappings() to all 12,400 rows<br/>→ 11,850 line items<br/>(550 skipped: totals, headers, metadata)"] --> L

    L["Per-item confidence scored<br/>avg 87%, 12 items below 80%<br/>Key field penalties applied"] --> M

    M["Cache mapper:<br/>mappers/evri/a3f8c1e2<br/>schemaVersion: b7e2f1a3c9d0"] --> N

    N["Response to frontend<br/>11,850 items, £142,380.50<br/>model: gemini-3-flash<br/>processingMs: 4,200"]

    style A fill:#e1f5fe
    style G fill:#fff9c4
    style H fill:#fff3e0
    style I fill:#fff3e0
    style N fill:#e8f5e9

Example 2 — Cache Hit: Same Evri Format, Different Month

The same user processes April's Evri invoice. Same table structure, different data.

graph TD
    A["User selects:<br/><b>evri/april-2026.pdf</b><br/>Same format as March"] --> B

    B["Extract All → 45 tables<br/>11,200 rows"] --> C

    C["POST /transform<br/>fingerprint: <b>a3f8c1e2...</b><br/>(same headers = same FP)"] --> D

    D{"Load cached mappers<br/>for 'evri'"}

    D --> E["Found 1 mapper:<br/>a3f8c1e2 (EXACT match)<br/>schemaVersion: b7e2f1a3c9d0 ✓<br/>usageCount: 1"]

    E --> F["validateMappingResult()<br/>✓ 10,800 items produced<br/>✓ 96% of rows yielded items<br/>✓ 94% have descriptions<br/>✓ non-zero amounts present"]

    F --> G["CACHE HIT<br/>Skip LLM entirely<br/>⏱ 12ms apply time"]

    G --> H["Bump usage:<br/>usageCount: 2<br/>lastUsedAt: now()"]

    H --> I["Response: 10,800 items<br/>£128,450.20<br/>model: cached (gemini-3-flash)<br/>processingMs: 12"]

    style A fill:#e1f5fe
    style E fill:#fff9c4
    style G fill:#e8f5e9
    style I fill:#e8f5e9

Cache hits return in milliseconds vs seconds for LLM calls. The fingerprint is a SHA-256 of sorted headers per table — same column structure = same fingerprint regardless of row data.


Example 3 — Schema Change Invalidates Cache

A developer updates MASTER_SCHEMA_PROMPT to add a new field. All cached mappers are automatically invalidated.

graph TD
    A["Developer adds field to<br/>MASTER_SCHEMA_PROMPT:<br/><code>fuelSurchargeRate?: number</code>"] --> B

    B["Server restart<br/>SCHEMA_VERSION rehashed:<br/>b7e2f1a3c9d0 → <b>d4a1c8f2e7b3</b>"] --> C

    C["User processes same Evri invoice"] --> D

    D["loadCachedMappers('evri')<br/>Found 1 mapper on disk"] --> E

    E{"mapper.schemaVersion<br/>=== SCHEMA_VERSION?"}

    E -->|"b7e2f1a3c9d0 ≠ d4a1c8f2e7b3<br/>MISMATCH"| F

    F["Mapper skipped<br/>Log: 'schema version mismatch'<br/>0 valid mappers remaining"] --> G

    G["No cache hit → fresh LLM call<br/>New prompt includes fuelSurchargeRate"] --> H

    H["New mapper cached with<br/>schemaVersion: d4a1c8f2e7b3"] --> I

    I["Future requests use new mapper<br/>with updated schema"]

    style A fill:#f3e5f5
    style E fill:#fff9c4
    style F fill:#fce4ec
    style G fill:#fff3e0
    style I fill:#e8f5e9

Mappers without a schemaVersion field (pre-Fix-2 legacy) are also rejected: undefined !== hash. No manual cache clearing needed.


Example 4 — Concurrent Duplicate Requests

Two browser tabs process the same DPD invoice format simultaneously.

sequenceDiagram
    participant T1 as Tab 1
    participant T2 as Tab 2
    participant BE as Backend
    participant LLM as OpenRouter

    T1->>BE: POST /transform { tables, courier: "dpd" }
    Note right of BE: lockKey = "dpd:f7a2b8c1"<br/>inFlightLLM.has()? No<br/>Create promise, store in map

    T2->>BE: POST /transform { tables, courier: "dpd" }
    Note right of BE: lockKey = "dpd:f7a2b8c1"<br/>inFlightLLM.has()? YES<br/>Await existing promise

    BE->>LLM: Single LLM call (Tab 1's promise)
    LLM-->>BE: { tableMappings, ... }

    Note right of BE: Both requests resolve<br/>from same LLM result

    BE-->>T1: { runId: "abc1", lineItems, processingMs: 3800 }
    BE-->>T2: { runId: "def2", lineItems, processingMs: 3850 }

    Note over T1,T2: Same mapping rules applied<br/>Different runIds + timing<br/>Only 1 LLM call billed

The inFlightLLM map uses {courier}:{fingerprint} as the lock key. The finally block ensures cleanup even if the LLM call fails. Each request still gets its own runId for independent logging.


Example 5 — LLM Returns Invalid Response

The LLM returns malformed JSON or a structurally invalid response.

graph TD
    A["POST /transform<br/>No cache hit → call LLM"] --> B

    B["OpenRouter responds<br/>(3.2s)"] --> C

    C{"JSON.parse<br/>succeeds?"}

    C -->|"No — garbled output<br/>or markdown fences"| D
    D["Throw: 'LLM returned invalid JSON'<br/>+ first 2000 chars of raw response"] --> ERR

    C -->|"Yes — parsed object"| E

    E{"validateLLMResponse()"}

    E -->|"tableMappings missing<br/>or not an array"| F
    F["Throw: 'tableMappings is not an array'"] --> ERR

    E -->|"columnMapping missing<br/>sourceColumn or targetField"| G
    G["Throw: 'tableMappings[0].columnMappings[2]<br/>.targetField is not a string'"] --> ERR

    E -->|"All checks pass"| H
    H["Normalize optional arrays<br/>classificationRules ??= []<br/>skipRules ??= []<br/>unmappedTables ??= []"] --> OK

    ERR["catch block:<br/>500 { error: detail }<br/>logEvent('etl:transform:error')"]
    OK["Continue to applyMappings()"]

    style C fill:#fff9c4
    style E fill:#fff9c4
    style ERR fill:#fce4ec
    style OK fill:#e8f5e9

Example 6 — Subset Header Matching (Re-indexed Tables)

A cached mapper was built from tables with indices 0-5. A new extraction of the same format produces tables with indices 3-8 (different page range or table ordering).

graph TD
    A["Cached mapper expects:<br/>tableIndex: 0<br/>columnMappings: [Weight, Price, Service]"] --> B

    B["New extraction tables:<br/>Table 3: headers [Weight, Price, Service, <b>Notes</b>]<br/>Table 4: headers [Date, Ref, Amount]"] --> C

    C{"Find table by index 0?"}
    C -->|"Not found"| D

    D{"Subset header match:<br/>Every mapped sourceColumn<br/>exists in table's headers?"}

    D -->|"Table 3: headerSet has<br/>Weight ✓ Price ✓ Service ✓<br/>(extra 'Notes' column OK)"| E

    D -->|"Table 4: headerSet missing<br/>Weight ✗ — skip"| F

    E["Remap: tableIndex 0 → 3<br/>Apply mappings to Table 3<br/>Extra 'Notes' column ignored"]

    F["Table 4 not matched<br/>→ skipped for this mapping"]

    style C fill:#fff9c4
    style D fill:#fff9c4
    style E fill:#e8f5e9
    style F fill:#fff3e0

Fix 8 changed this from exact match (sorted headers must be identical) to subset match (every mapped column must exist in the table). Tables can have extra columns and still match.


Example 7 — Extract Mode Toggle (Debug vs Production)

graph LR
    subgraph AllMode["Extract All (Default)"]
        A1["POST /docling<br/>{ file: 'invoice.pdf' }<br/>No startPage/endPage"] --> A2["Docling processes<br/>entire document<br/>All 450 pages"]
        A2 --> A3["⏱ ~120s<br/>Elapsed timer shows:<br/>'Extracting... 45s<br/>(large documents may<br/>take several minutes)'"]
    end

    subgraph WindowMode["Page Window (Debug)"]
        B1["POST /docling<br/>{ file: 'invoice.pdf',<br/>startPage: 1, endPage: 5 }"] --> B2["Docling processes<br/>pages 1-5 only"]
        B2 --> B3["⏱ ~3s<br/>← → navigation<br/>to slide window"]
    end

    style AllMode fill:#e8f5e9
    style WindowMode fill:#e1f5fe

The toggle sits in the ExtractPhase toolbar. "Extract All" is default for production use. "Page Window" preserves the old PAGE_WINDOW=5 navigation for debugging specific page ranges.


Per-Item Confidence Scoring

Fix 5 replaced the flat table-level confidence with a per-item multiplicative score.

graph TD
    A["Start: mapping.confidence<br/>(table-level baseline from LLM)<br/>e.g. 0.92"] --> B

    B["Factor 1: Field Population<br/>× (0.7 + 0.3 × populated/total)<br/><br/>8/10 fields populated:<br/>× (0.7 + 0.3 × 0.8) = × 0.94"] --> C

    C["Factor 2: Key Field Penalties<br/>totalAmount === 0? × 0.6<br/>description empty? × 0.5<br/><br/>Both present: × 1.0"] --> D

    D["Factor 3: Classification Defaults<br/>× (1 - 0.15 × defaulted/total)<br/><br/>1 of 3 rules fell to default:<br/>× (1 - 0.15 × 0.33) = × 0.95"] --> E

    E["Final: clamp(0, 1)<br/>0.92 × 0.94 × 1.0 × 0.95<br/>= <b>0.821</b> (82%)"]

    F["Edge case: zero amount + empty desc<br/>0.92 × 0.94 × 0.6 × 0.5 × 0.95<br/>= <b>0.246</b> (25%)<br/>→ flagged as low confidence"]

    style A fill:#e1f5fe
    style E fill:#e8f5e9
    style F fill:#fce4ec

Stratified Sample Row Selection

Fix 4 ensures the LLM sees diverse data instead of just the first 8 rows.

graph TD
    A["Input: 500 rows<br/>max = 8 samples"] --> B

    B["Take first 4 rows<br/>(header-adjacent, cleanest)"] --> C

    C["From remaining 496 rows:<br/>Pick rows with unique<br/>first-column values"] --> D

    D{"Found 4+ diverse rows?"}

    D -->|"Yes: e.g. different<br/>tracking prefixes<br/>AB12, CD34, EF56, GH78"| E
    E["Return: first 4 + 4 diverse = 8"]

    D -->|"No: only 2 unique values"| F
    F["Fill remaining 2 slots with<br/>evenly-spaced rows:<br/>row 124, row 248"] --> G
    G["Return: first 4 + 2 diverse + 2 spaced = 8"]

    style A fill:#e1f5fe
    style E fill:#e8f5e9
    style G fill:#e8f5e9

For small tables (≤8 rows), all rows are included — no sampling needed.


Data Model Summary

erDiagram
    KV_STORE {
        string key PK "mappers/{courier}/{fp}"
        string value "JSON StoredMapper"
    }

    ETL_RUN {
        string id PK "8-char UUID"
        string timestamp
        string courier
        string fileName
        boolean cacheHit
        string mapperFingerprint
    }

    STORED_MAPPER {
        string courier
        string fingerprint PK
        string schemaVersion "SHA-256 of prompt"
        string model "gemini-3-flash"
        number usageCount
        string lastUsedAt
    }

    LLM_RESPONSE {
        number tableIndex
        string tableLabel
        string purpose
        number confidence
    }

    LINE_ITEM {
        string description
        number quantity
        number totalAmount
        string category "base_service|surcharge|..."
        string serviceType
        string trackingNumber
        number confidence "per-item scored"
        boolean isSurcharge
        boolean isDiscount
    }

    KV_STORE ||--o{ STORED_MAPPER : stores
    KV_STORE ||--o{ ETL_RUN : stores
    STORED_MAPPER ||--|{ LLM_RESPONSE : contains
    ETL_RUN ||--|{ LINE_ITEM : produces
    LLM_RESPONSE ||--|{ LINE_ITEM : maps_to

Service Map

Service Port Role
Frontend (Vite) 3000 React SPA — phase orchestration + data display
Backend (Express) 3001 ETL endpoints, LLM proxy, KV cache
Docling Server (Python) 3002 ML-based document structure extraction
OpenRouter API external LLM gateway → Gemini 3 Flash
VictoriaLogs 9428 Structured event logging (optional)
Azurite 10000 Azure Blob emulator for KV store (optional)

Environment Config

Variable Default Purpose
OPENROUTER_API_KEY LLM API access (required for transform)
DOCLING_URL http://localhost:3002 Docling server address
STORE_BACKEND default (filesystem) azure for Blob Storage
VICTORIALOGS_URL http://localhost:9428 Structured logging endpoint
PORT 3001 Backend server port