#Data Reconciliation Cookbook
Version: 0.40.0 | Updated: 2026-03-23 | Applies to: ranvier-core 0.40+ | Category: Cookbook
#Overview
Data reconciliation — matching records from two or more sources to identify
discrepancies — is fundamental in finance, payments, and inventory management.
Ranvier's Outcome model (Matched / Unmatched / Ambiguous) maps directly to
reconciliation results, and the Axon pipeline naturally models the match-classify-report
workflow.
This cookbook covers matching strategies, weighted scoring, Outcome classification, report generation, and audit trail integration.
#1. Reconciliation Data Types
Define the types for source records and match results.
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceRecord {
pub id: String,
pub reference: String,
pub amount: rust_decimal::Decimal,
pub date: chrono::NaiveDate,
pub description: String,
pub source: String, // "internal" or "external"
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MatchResult {
pub internal: SourceRecord,
pub external: Option<SourceRecord>,
pub score: f64, // 0.0 - 1.0
pub classification: MatchClassification,
pub reason: String,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum MatchClassification {
Matched,
Unmatched,
Ambiguous,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReconciliationReport {
pub total_internal: usize,
pub total_external: usize,
pub matched: usize,
pub unmatched: usize,
pub ambiguous: usize,
pub total_amount_diff: rust_decimal::Decimal,
pub results: Vec<MatchResult>,
}#2. Fuzzy Matching Strategies
#Exact Reference Match
fn exact_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
if internal.reference == external.reference {
1.0
} else {
0.0
}
}#String Similarity (strsim)
use strsim::{jaro_winkler, normalized_levenshtein};
fn fuzzy_reference_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
let jw = jaro_winkler(&internal.reference, &external.reference);
let lev = normalized_levenshtein(&internal.reference, &external.reference);
// Average of both metrics
(jw + lev) / 2.0
}#Amount Tolerance
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
fn amount_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
let diff = (internal.amount - external.amount).abs();
let tolerance = dec!(0.01); // $0.01 tolerance
if diff <= tolerance {
1.0
} else if diff <= dec!(1.00) {
// Partial match for small differences
1.0 - (diff / dec!(1.00)).to_f64().unwrap_or(1.0)
} else {
0.0
}
}#Date Proximity
fn date_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
let diff = (internal.date - external.date).num_days().abs();
match diff {
0 => 1.0,
1 => 0.9,
2 => 0.7,
3..=7 => 0.3,
_ => 0.0,
}
}#3. Weighted Composite Scoring
Combine multiple matching criteria with configurable weights.
#[derive(Debug, Clone)]
pub struct MatchWeights {
pub reference: f64, // 0.0 - 1.0
pub amount: f64,
pub date: f64,
pub description: f64,
}
impl Default for MatchWeights {
fn default() -> Self {
Self {
reference: 0.40,
amount: 0.30,
date: 0.20,
description: 0.10,
}
}
}
fn composite_score(
internal: &SourceRecord,
external: &SourceRecord,
weights: &MatchWeights,
) -> f64 {
let ref_score = fuzzy_reference_match(internal, external);
let amt_score = amount_match(internal, external);
let date_score = date_match(internal, external);
let desc_score = strsim::jaro_winkler(&internal.description, &external.description);
ref_score * weights.reference
+ amt_score * weights.amount
+ date_score * weights.date
+ desc_score * weights.description
}#4. Reconciliation Transition
The core matching Transition: for each internal record, find the best external match.
use async_trait::async_trait;
use ranvier_core::prelude::*;
#[derive(Debug, Clone)]
pub struct ReconciliationInput {
pub internal: Vec<SourceRecord>,
pub external: Vec<SourceRecord>,
}
/// Bus type: matching configuration.
#[derive(Debug, Clone)]
pub struct ReconciliationConfig {
pub weights: MatchWeights,
pub match_threshold: f64, // >= this = Matched (default: 0.85)
pub ambiguous_threshold: f64, // >= this but < match = Ambiguous (default: 0.60)
}
impl Default for ReconciliationConfig {
fn default() -> Self {
Self {
weights: MatchWeights::default(),
match_threshold: 0.85,
ambiguous_threshold: 0.60,
}
}
}
pub struct ReconcileTransition;
#[async_trait]
impl Transition<ReconciliationInput, ReconciliationReport> for ReconcileTransition {
type Error = String;
type Resources = ();
async fn run(
&self,
input: ReconciliationInput,
_resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<ReconciliationReport, Self::Error> {
let config = bus.get_cloned::<ReconciliationConfig>()
.unwrap_or_default();
let mut results = Vec::new();
let mut used_external: std::collections::HashSet<String> = std::collections::HashSet::new();
for internal in &input.internal {
let mut best_match: Option<(&SourceRecord, f64)> = None;
for external in &input.external {
if used_external.contains(&external.id) {
continue;
}
let score = composite_score(internal, external, &config.weights);
if let Some((_, best_score)) = best_match {
if score > best_score {
best_match = Some((external, score));
}
} else if score > 0.0 {
best_match = Some((external, score));
}
}
let (external, score, classification, reason) = match best_match {
Some((ext, score)) if score >= config.match_threshold => {
used_external.insert(ext.id.clone());
(
Some(ext.clone()),
score,
MatchClassification::Matched,
format!("score {:.2} >= threshold {:.2}", score, config.match_threshold),
)
}
Some((ext, score)) if score >= config.ambiguous_threshold => {
(
Some(ext.clone()),
score,
MatchClassification::Ambiguous,
format!(
"score {:.2} between {:.2} and {:.2}",
score, config.ambiguous_threshold, config.match_threshold
),
)
}
_ => (
None,
0.0,
MatchClassification::Unmatched,
"no match found above ambiguous threshold".into(),
),
};
results.push(MatchResult {
internal: internal.clone(),
external,
score,
classification,
reason,
});
}
let matched = results.iter().filter(|r| r.classification == MatchClassification::Matched).count();
let unmatched = results.iter().filter(|r| r.classification == MatchClassification::Unmatched).count();
let ambiguous = results.iter().filter(|r| r.classification == MatchClassification::Ambiguous).count();
let total_amount_diff: rust_decimal::Decimal = results.iter()
.filter_map(|r| {
r.external.as_ref().map(|ext| (r.internal.amount - ext.amount).abs())
})
.sum();
Outcome::next(ReconciliationReport {
total_internal: input.internal.len(),
total_external: input.external.len(),
matched,
unmatched,
ambiguous,
total_amount_diff,
results,
})
}
}#5. Outcome Classification Pipeline
Map reconciliation results to Ranvier Outcomes for downstream branching.
use ranvier_runtime::Axon;
let recon_pipeline = Axon::typed::<ReconciliationInput, ReconciliationReport>("reconcile")
.then(ReconcileTransition)
.then_fn("classify-outcome", |report, _res, _bus| async move {
if report.unmatched == 0 && report.ambiguous == 0 {
// All matched — success
Outcome::next(report)
} else if report.unmatched > 0 {
// Has unmatched records — needs investigation
Outcome::fault(format!(
"reconciliation incomplete: {} unmatched, {} ambiguous",
report.unmatched, report.ambiguous
))
} else {
// Only ambiguous — manual review needed
Outcome::next(report)
}
});#6. Report Generation
Generate JSON and CSV reports from reconciliation results.
pub struct GenerateJsonReport;
#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateJsonReport {
type Error = String;
type Resources = ();
async fn run(
&self,
report: ReconciliationReport,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<String, Self::Error> {
let json = serde_json::to_string_pretty(&report)
.map_err(|e| e.to_string())?;
Outcome::next(json)
}
}
pub struct GenerateCsvReport;
#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateCsvReport {
type Error = String;
type Resources = ();
async fn run(
&self,
report: ReconciliationReport,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<String, Self::Error> {
let mut csv = String::from(
"internal_id,internal_ref,internal_amount,external_id,external_ref,external_amount,score,classification,reason\n"
);
for result in &report.results {
let (ext_id, ext_ref, ext_amount) = match &result.external {
Some(ext) => (ext.id.as_str(), ext.reference.as_str(), ext.amount.to_string()),
None => ("", "", "".to_string()),
};
csv.push_str(&format!(
"{},{},{},{},{},{},{:.2},{:?},{}\n",
result.internal.id,
result.internal.reference,
result.internal.amount,
ext_id,
ext_ref,
ext_amount,
result.score,
result.classification,
result.reason.replace(',', ";"),
));
}
Outcome::next(csv)
}
}#7. Audit Trail Integration
Record match decisions in the audit hash chain for regulatory compliance.
use ranvier_audit::prelude::*;
let audited_recon = Axon::typed::<ReconciliationInput, ReconciliationReport>("audited-recon")
.then(ReconcileTransition)
.then_fn("audit-results", |report, _res, bus| async move {
if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
// Record summary
audit.record(AuditEntry {
action: "reconciliation.completed".into(),
actor: bus.get_cloned::<super::TenantContext>()
.map(|t| t.tenant_id)
.unwrap_or_else(|_| "system".into()),
resource: format!(
"matched={}, unmatched={}, ambiguous={}, amount_diff={}",
report.matched, report.unmatched, report.ambiguous, report.total_amount_diff
),
timestamp: chrono::Utc::now(),
}).await;
// Record each unmatched/ambiguous for investigation trail
for result in &report.results {
if result.classification != MatchClassification::Matched {
audit.record(AuditEntry {
action: format!("reconciliation.{:?}", result.classification).to_lowercase(),
actor: "system".into(),
resource: format!(
"internal={}, score={:.2}, reason={}",
result.internal.id, result.score, result.reason
),
timestamp: chrono::Utc::now(),
}).await;
}
}
}
Outcome::next(report)
});#8. Full Server Example
use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ranvier::http()
.bind("0.0.0.0:8080")
.guard(RequestIdGuard::new())
.guard(AccessLogGuard::new())
.guard(AuthGuard::bearer(vec![
std::env::var("API_TOKEN")?,
]))
.post_typed("/api/reconcile", recon_pipeline())
.post_typed("/api/reconcile/csv", recon_csv_pipeline())
.run(())
.await?;
Ok(())
}
fn recon_pipeline() -> Axon<ReconciliationInput, ReconciliationReport, String, ()> {
Axon::typed::<ReconciliationInput, String>("reconcile")
.then(ReconcileTransition)
}
fn recon_csv_pipeline() -> Axon<ReconciliationInput, String, String, ()> {
Axon::typed::<ReconciliationInput, String>("reconcile-csv")
.then(ReconcileTransition)
.then(GenerateCsvReport)
}#9. Testing Reconciliation
#[cfg(test)]
mod tests {
use super::*;
use rust_decimal_macros::dec;
fn sample_internal() -> Vec<SourceRecord> {
vec![
SourceRecord {
id: "INT-001".into(),
reference: "INV-2026-001".into(),
amount: dec!(100.00),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
description: "Monthly subscription".into(),
source: "internal".into(),
},
SourceRecord {
id: "INT-002".into(),
reference: "INV-2026-002".into(),
amount: dec!(250.50),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 5).unwrap(),
description: "Enterprise upgrade".into(),
source: "internal".into(),
},
]
}
fn sample_external() -> Vec<SourceRecord> {
vec![
SourceRecord {
id: "EXT-001".into(),
reference: "INV-2026-001".into(),
amount: dec!(100.00),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
description: "Monthly subscription payment".into(),
source: "external".into(),
},
// Slightly different reference for fuzzy matching test
SourceRecord {
id: "EXT-002".into(),
reference: "INV-2026-02".into(), // missing trailing 0
amount: dec!(250.50),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 6).unwrap(), // 1 day off
description: "Enterprise upgrade".into(),
source: "external".into(),
},
]
}
#[test]
fn test_exact_match() {
let internal = &sample_internal()[0];
let external = &sample_external()[0];
assert_eq!(exact_match(internal, external), 1.0);
}
#[test]
fn test_fuzzy_match() {
let internal = &sample_internal()[1];
let external = &sample_external()[1];
let score = fuzzy_reference_match(internal, external);
assert!(score > 0.8); // similar but not identical
assert!(score < 1.0);
}
#[test]
fn test_amount_match_exact() {
let internal = &sample_internal()[0];
let external = &sample_external()[0];
assert_eq!(amount_match(internal, external), 1.0);
}
#[test]
fn test_composite_score() {
let internal = &sample_internal()[0];
let external = &sample_external()[0];
let weights = MatchWeights::default();
let score = composite_score(internal, external, &weights);
assert!(score > 0.9); // near-perfect match
}
#[tokio::test]
async fn test_reconciliation() {
let input = ReconciliationInput {
internal: sample_internal(),
external: sample_external(),
};
let mut bus = Bus::new();
bus.insert(ReconciliationConfig::default());
let transition = ReconcileTransition;
let result = transition.run(input, &(), &mut bus).await;
match result {
Outcome::Next(report) => {
assert_eq!(report.total_internal, 2);
assert_eq!(report.total_external, 2);
assert!(report.matched >= 1); // at least exact match
}
_ => panic!("reconciliation should succeed"),
}
}
#[test]
fn test_date_match() {
let d1 = SourceRecord {
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
..sample_internal()[0].clone()
};
let d2 = SourceRecord {
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
..sample_external()[0].clone()
};
assert_eq!(date_match(&d1, &d2), 1.0);
let d3 = SourceRecord {
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap(),
..sample_external()[0].clone()
};
assert_eq!(date_match(&d1, &d3), 0.9);
}
}#See Also
- Saga Compensation Cookbook — compensating failed reconciliation steps
- Multi-Tenant Isolation Cookbook — per-tenant reconciliation
- Billing & Metering Cookbook — usage reconciliation with billing
- Guard Patterns Cookbook — input validation Guards