#Data Reconciliation Cookbook

Version: 0.40.0 | Updated: 2026-03-23 | Applies to: ranvier-core 0.40+ | Category: Cookbook


#Overview

Data reconciliation — matching records from two or more sources to identify discrepancies — is fundamental in finance, payments, and inventory management. Ranvier's Outcome model (Matched / Unmatched / Ambiguous) maps directly to reconciliation results, and the Axon pipeline naturally models the match-classify-report workflow.

This cookbook covers matching strategies, weighted scoring, Outcome classification, report generation, and audit trail integration.


#1. Reconciliation Data Types

Define the types for source records and match results.

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceRecord {
    pub id: String,
    pub reference: String,
    pub amount: rust_decimal::Decimal,
    pub date: chrono::NaiveDate,
    pub description: String,
    pub source: String,  // "internal" or "external"
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MatchResult {
    pub internal: SourceRecord,
    pub external: Option<SourceRecord>,
    pub score: f64,           // 0.0 - 1.0
    pub classification: MatchClassification,
    pub reason: String,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum MatchClassification {
    Matched,
    Unmatched,
    Ambiguous,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReconciliationReport {
    pub total_internal: usize,
    pub total_external: usize,
    pub matched: usize,
    pub unmatched: usize,
    pub ambiguous: usize,
    pub total_amount_diff: rust_decimal::Decimal,
    pub results: Vec<MatchResult>,
}

#2. Fuzzy Matching Strategies

#Exact Reference Match

fn exact_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    if internal.reference == external.reference {
        1.0
    } else {
        0.0
    }
}

#String Similarity (strsim)

use strsim::{jaro_winkler, normalized_levenshtein};

fn fuzzy_reference_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    let jw = jaro_winkler(&internal.reference, &external.reference);
    let lev = normalized_levenshtein(&internal.reference, &external.reference);
    // Average of both metrics
    (jw + lev) / 2.0
}

#Amount Tolerance

use rust_decimal::Decimal;
use rust_decimal_macros::dec;

fn amount_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    let diff = (internal.amount - external.amount).abs();
    let tolerance = dec!(0.01); // $0.01 tolerance

    if diff <= tolerance {
        1.0
    } else if diff <= dec!(1.00) {
        // Partial match for small differences
        1.0 - (diff / dec!(1.00)).to_f64().unwrap_or(1.0)
    } else {
        0.0
    }
}

#Date Proximity

fn date_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    let diff = (internal.date - external.date).num_days().abs();
    match diff {
        0 => 1.0,
        1 => 0.9,
        2 => 0.7,
        3..=7 => 0.3,
        _ => 0.0,
    }
}

#3. Weighted Composite Scoring

Combine multiple matching criteria with configurable weights.

#[derive(Debug, Clone)]
pub struct MatchWeights {
    pub reference: f64,   // 0.0 - 1.0
    pub amount: f64,
    pub date: f64,
    pub description: f64,
}

impl Default for MatchWeights {
    fn default() -> Self {
        Self {
            reference: 0.40,
            amount: 0.30,
            date: 0.20,
            description: 0.10,
        }
    }
}

fn composite_score(
    internal: &SourceRecord,
    external: &SourceRecord,
    weights: &MatchWeights,
) -> f64 {
    let ref_score = fuzzy_reference_match(internal, external);
    let amt_score = amount_match(internal, external);
    let date_score = date_match(internal, external);
    let desc_score = strsim::jaro_winkler(&internal.description, &external.description);

    ref_score * weights.reference
        + amt_score * weights.amount
        + date_score * weights.date
        + desc_score * weights.description
}

#4. Reconciliation Transition

The core matching Transition: for each internal record, find the best external match.

use async_trait::async_trait;
use ranvier_core::prelude::*;

#[derive(Debug, Clone)]
pub struct ReconciliationInput {
    pub internal: Vec<SourceRecord>,
    pub external: Vec<SourceRecord>,
}

/// Bus type: matching configuration.
#[derive(Debug, Clone)]
pub struct ReconciliationConfig {
    pub weights: MatchWeights,
    pub match_threshold: f64,      // >= this = Matched (default: 0.85)
    pub ambiguous_threshold: f64,  // >= this but < match = Ambiguous (default: 0.60)
}

impl Default for ReconciliationConfig {
    fn default() -> Self {
        Self {
            weights: MatchWeights::default(),
            match_threshold: 0.85,
            ambiguous_threshold: 0.60,
        }
    }
}

pub struct ReconcileTransition;

#[async_trait]
impl Transition<ReconciliationInput, ReconciliationReport> for ReconcileTransition {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        input: ReconciliationInput,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<ReconciliationReport, Self::Error> {
        let config = bus.get_cloned::<ReconciliationConfig>()
            .unwrap_or_default();

        let mut results = Vec::new();
        let mut used_external: std::collections::HashSet<String> = std::collections::HashSet::new();

        for internal in &input.internal {
            let mut best_match: Option<(&SourceRecord, f64)> = None;

            for external in &input.external {
                if used_external.contains(&external.id) {
                    continue;
                }

                let score = composite_score(internal, external, &config.weights);

                if let Some((_, best_score)) = best_match {
                    if score > best_score {
                        best_match = Some((external, score));
                    }
                } else if score > 0.0 {
                    best_match = Some((external, score));
                }
            }

            let (external, score, classification, reason) = match best_match {
                Some((ext, score)) if score >= config.match_threshold => {
                    used_external.insert(ext.id.clone());
                    (
                        Some(ext.clone()),
                        score,
                        MatchClassification::Matched,
                        format!("score {:.2} >= threshold {:.2}", score, config.match_threshold),
                    )
                }
                Some((ext, score)) if score >= config.ambiguous_threshold => {
                    (
                        Some(ext.clone()),
                        score,
                        MatchClassification::Ambiguous,
                        format!(
                            "score {:.2} between {:.2} and {:.2}",
                            score, config.ambiguous_threshold, config.match_threshold
                        ),
                    )
                }
                _ => (
                    None,
                    0.0,
                    MatchClassification::Unmatched,
                    "no match found above ambiguous threshold".into(),
                ),
            };

            results.push(MatchResult {
                internal: internal.clone(),
                external,
                score,
                classification,
                reason,
            });
        }

        let matched = results.iter().filter(|r| r.classification == MatchClassification::Matched).count();
        let unmatched = results.iter().filter(|r| r.classification == MatchClassification::Unmatched).count();
        let ambiguous = results.iter().filter(|r| r.classification == MatchClassification::Ambiguous).count();

        let total_amount_diff: rust_decimal::Decimal = results.iter()
            .filter_map(|r| {
                r.external.as_ref().map(|ext| (r.internal.amount - ext.amount).abs())
            })
            .sum();

        Outcome::next(ReconciliationReport {
            total_internal: input.internal.len(),
            total_external: input.external.len(),
            matched,
            unmatched,
            ambiguous,
            total_amount_diff,
            results,
        })
    }
}

#5. Outcome Classification Pipeline

Map reconciliation results to Ranvier Outcomes for downstream branching.

use ranvier_runtime::Axon;

let recon_pipeline = Axon::typed::<ReconciliationInput, ReconciliationReport>("reconcile")
    .then(ReconcileTransition)
    .then_fn("classify-outcome", |report, _res, _bus| async move {
        if report.unmatched == 0 && report.ambiguous == 0 {
            // All matched — success
            Outcome::next(report)
        } else if report.unmatched > 0 {
            // Has unmatched records — needs investigation
            Outcome::fault(format!(
                "reconciliation incomplete: {} unmatched, {} ambiguous",
                report.unmatched, report.ambiguous
            ))
        } else {
            // Only ambiguous — manual review needed
            Outcome::next(report)
        }
    });

#6. Report Generation

Generate JSON and CSV reports from reconciliation results.

pub struct GenerateJsonReport;

#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateJsonReport {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        report: ReconciliationReport,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<String, Self::Error> {
        let json = serde_json::to_string_pretty(&report)
            .map_err(|e| e.to_string())?;
        Outcome::next(json)
    }
}

pub struct GenerateCsvReport;

#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateCsvReport {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        report: ReconciliationReport,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<String, Self::Error> {
        let mut csv = String::from(
            "internal_id,internal_ref,internal_amount,external_id,external_ref,external_amount,score,classification,reason\n"
        );

        for result in &report.results {
            let (ext_id, ext_ref, ext_amount) = match &result.external {
                Some(ext) => (ext.id.as_str(), ext.reference.as_str(), ext.amount.to_string()),
                None => ("", "", "".to_string()),
            };

            csv.push_str(&format!(
                "{},{},{},{},{},{},{:.2},{:?},{}\n",
                result.internal.id,
                result.internal.reference,
                result.internal.amount,
                ext_id,
                ext_ref,
                ext_amount,
                result.score,
                result.classification,
                result.reason.replace(',', ";"),
            ));
        }

        Outcome::next(csv)
    }
}

#7. Audit Trail Integration

Record match decisions in the audit hash chain for regulatory compliance.

use ranvier_audit::prelude::*;

let audited_recon = Axon::typed::<ReconciliationInput, ReconciliationReport>("audited-recon")
    .then(ReconcileTransition)
    .then_fn("audit-results", |report, _res, bus| async move {
        if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
            // Record summary
            audit.record(AuditEntry {
                action: "reconciliation.completed".into(),
                actor: bus.get_cloned::<super::TenantContext>()
                    .map(|t| t.tenant_id)
                    .unwrap_or_else(|_| "system".into()),
                resource: format!(
                    "matched={}, unmatched={}, ambiguous={}, amount_diff={}",
                    report.matched, report.unmatched, report.ambiguous, report.total_amount_diff
                ),
                timestamp: chrono::Utc::now(),
            }).await;

            // Record each unmatched/ambiguous for investigation trail
            for result in &report.results {
                if result.classification != MatchClassification::Matched {
                    audit.record(AuditEntry {
                        action: format!("reconciliation.{:?}", result.classification).to_lowercase(),
                        actor: "system".into(),
                        resource: format!(
                            "internal={}, score={:.2}, reason={}",
                            result.internal.id, result.score, result.reason
                        ),
                        timestamp: chrono::Utc::now(),
                    }).await;
                }
            }
        }

        Outcome::next(report)
    });

#8. Full Server Example

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ranvier::http()
        .bind("0.0.0.0:8080")
        .guard(RequestIdGuard::new())
        .guard(AccessLogGuard::new())
        .guard(AuthGuard::bearer(vec![
            std::env::var("API_TOKEN")?,
        ]))
        .post_typed("/api/reconcile", recon_pipeline())
        .post_typed("/api/reconcile/csv", recon_csv_pipeline())
        .run(())
        .await?;

    Ok(())
}

fn recon_pipeline() -> Axon<ReconciliationInput, ReconciliationReport, String, ()> {
    Axon::typed::<ReconciliationInput, String>("reconcile")
        .then(ReconcileTransition)
}

fn recon_csv_pipeline() -> Axon<ReconciliationInput, String, String, ()> {
    Axon::typed::<ReconciliationInput, String>("reconcile-csv")
        .then(ReconcileTransition)
        .then(GenerateCsvReport)
}

#9. Testing Reconciliation

#[cfg(test)]
mod tests {
    use super::*;
    use rust_decimal_macros::dec;

    fn sample_internal() -> Vec<SourceRecord> {
        vec![
            SourceRecord {
                id: "INT-001".into(),
                reference: "INV-2026-001".into(),
                amount: dec!(100.00),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
                description: "Monthly subscription".into(),
                source: "internal".into(),
            },
            SourceRecord {
                id: "INT-002".into(),
                reference: "INV-2026-002".into(),
                amount: dec!(250.50),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 5).unwrap(),
                description: "Enterprise upgrade".into(),
                source: "internal".into(),
            },
        ]
    }

    fn sample_external() -> Vec<SourceRecord> {
        vec![
            SourceRecord {
                id: "EXT-001".into(),
                reference: "INV-2026-001".into(),
                amount: dec!(100.00),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
                description: "Monthly subscription payment".into(),
                source: "external".into(),
            },
            // Slightly different reference for fuzzy matching test
            SourceRecord {
                id: "EXT-002".into(),
                reference: "INV-2026-02".into(),  // missing trailing 0
                amount: dec!(250.50),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 6).unwrap(),  // 1 day off
                description: "Enterprise upgrade".into(),
                source: "external".into(),
            },
        ]
    }

    #[test]
    fn test_exact_match() {
        let internal = &sample_internal()[0];
        let external = &sample_external()[0];
        assert_eq!(exact_match(internal, external), 1.0);
    }

    #[test]
    fn test_fuzzy_match() {
        let internal = &sample_internal()[1];
        let external = &sample_external()[1];
        let score = fuzzy_reference_match(internal, external);
        assert!(score > 0.8);  // similar but not identical
        assert!(score < 1.0);
    }

    #[test]
    fn test_amount_match_exact() {
        let internal = &sample_internal()[0];
        let external = &sample_external()[0];
        assert_eq!(amount_match(internal, external), 1.0);
    }

    #[test]
    fn test_composite_score() {
        let internal = &sample_internal()[0];
        let external = &sample_external()[0];
        let weights = MatchWeights::default();
        let score = composite_score(internal, external, &weights);
        assert!(score > 0.9);  // near-perfect match
    }

    #[tokio::test]
    async fn test_reconciliation() {
        let input = ReconciliationInput {
            internal: sample_internal(),
            external: sample_external(),
        };

        let mut bus = Bus::new();
        bus.insert(ReconciliationConfig::default());

        let transition = ReconcileTransition;
        let result = transition.run(input, &(), &mut bus).await;

        match result {
            Outcome::Next(report) => {
                assert_eq!(report.total_internal, 2);
                assert_eq!(report.total_external, 2);
                assert!(report.matched >= 1);  // at least exact match
            }
            _ => panic!("reconciliation should succeed"),
        }
    }

    #[test]
    fn test_date_match() {
        let d1 = SourceRecord {
            date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
            ..sample_internal()[0].clone()
        };
        let d2 = SourceRecord {
            date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
            ..sample_external()[0].clone()
        };
        assert_eq!(date_match(&d1, &d2), 1.0);

        let d3 = SourceRecord {
            date: chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap(),
            ..sample_external()[0].clone()
        };
        assert_eq!(date_match(&d1, &d3), 0.9);
    }
}

#See Also

  • Saga Compensation Cookbook — compensating failed reconciliation steps
  • Multi-Tenant Isolation Cookbook — per-tenant reconciliation
  • Billing & Metering Cookbook — usage reconciliation with billing
  • Guard Patterns Cookbook — input validation Guards