#데이터 대사 Cookbook
버전: 0.40.0 | 업데이트: 2026-03-23 | 적용 대상: ranvier-core 0.40+ | 카테고리: 쿡북
#Overview
데이터 대사 — 두 개 이상의 소스에서 레코드를 매칭하여 불일치를 식별하는 작업 —
는 금융, 결제, 재고 관리의 기본이다. Ranvier의 Outcome 모델
(Matched / Unmatched / Ambiguous)은 대사 결과에 직접 매핑되며,
Axon 파이프라인은 매칭-분류-리포트 워크플로우를 자연스럽게 모델링한다.
이 쿡북은 매칭 전략, 가중치 스코어링, Outcome 분류, 리포트 생성, 감사 추적 연동을 다룬다.
#1. 대사 데이터 타입
소스 레코드와 매칭 결과 타입을 정의한다.
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceRecord {
pub id: String,
pub reference: String,
pub amount: rust_decimal::Decimal,
pub date: chrono::NaiveDate,
pub description: String,
pub source: String, // "internal" 또는 "external"
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MatchResult {
pub internal: SourceRecord,
pub external: Option<SourceRecord>,
pub score: f64, // 0.0 - 1.0
pub classification: MatchClassification,
pub reason: String,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum MatchClassification {
Matched, // 일치
Unmatched, // 불일치
Ambiguous, // 모호
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReconciliationReport {
pub total_internal: usize,
pub total_external: usize,
pub matched: usize,
pub unmatched: usize,
pub ambiguous: usize,
pub total_amount_diff: rust_decimal::Decimal,
pub results: Vec<MatchResult>,
}#2. 퍼지 매칭 전략
#정확한 참조번호 매칭
fn exact_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
if internal.reference == external.reference {
1.0
} else {
0.0
}
}#문자열 유사도 (strsim)
use strsim::{jaro_winkler, normalized_levenshtein};
fn fuzzy_reference_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
let jw = jaro_winkler(&internal.reference, &external.reference);
let lev = normalized_levenshtein(&internal.reference, &external.reference);
// 두 지표의 평균
(jw + lev) / 2.0
}#금액 허용 오차
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
fn amount_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
let diff = (internal.amount - external.amount).abs();
let tolerance = dec!(0.01); // $0.01 허용 오차
if diff <= tolerance {
1.0
} else if diff <= dec!(1.00) {
// 소액 차이에 대한 부분 매칭
1.0 - (diff / dec!(1.00)).to_f64().unwrap_or(1.0)
} else {
0.0
}
}#날짜 근접도
fn date_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
let diff = (internal.date - external.date).num_days().abs();
match diff {
0 => 1.0,
1 => 0.9,
2 => 0.7,
3..=7 => 0.3,
_ => 0.0,
}
}#3. 가중치 복합 스코어링
설정 가능한 가중치로 여러 매칭 기준을 결합한다.
#[derive(Debug, Clone)]
pub struct MatchWeights {
pub reference: f64, // 0.0 - 1.0
pub amount: f64,
pub date: f64,
pub description: f64,
}
impl Default for MatchWeights {
fn default() -> Self {
Self {
reference: 0.40,
amount: 0.30,
date: 0.20,
description: 0.10,
}
}
}
fn composite_score(
internal: &SourceRecord,
external: &SourceRecord,
weights: &MatchWeights,
) -> f64 {
let ref_score = fuzzy_reference_match(internal, external);
let amt_score = amount_match(internal, external);
let date_score = date_match(internal, external);
let desc_score = strsim::jaro_winkler(&internal.description, &external.description);
ref_score * weights.reference
+ amt_score * weights.amount
+ date_score * weights.date
+ desc_score * weights.description
}#4. 대사 Transition
핵심 매칭 Transition: 각 내부 레코드에 대해 최적의 외부 매칭을 찾는다.
use async_trait::async_trait;
use ranvier_core::prelude::*;
#[derive(Debug, Clone)]
pub struct ReconciliationInput {
pub internal: Vec<SourceRecord>,
pub external: Vec<SourceRecord>,
}
/// Bus 타입: 매칭 설정.
#[derive(Debug, Clone)]
pub struct ReconciliationConfig {
pub weights: MatchWeights,
pub match_threshold: f64, // 이 값 이상 = Matched (기본: 0.85)
pub ambiguous_threshold: f64, // 이 값 이상이지만 match 미만 = Ambiguous (기본: 0.60)
}
impl Default for ReconciliationConfig {
fn default() -> Self {
Self {
weights: MatchWeights::default(),
match_threshold: 0.85,
ambiguous_threshold: 0.60,
}
}
}
pub struct ReconcileTransition;
#[async_trait]
impl Transition<ReconciliationInput, ReconciliationReport> for ReconcileTransition {
type Error = String;
type Resources = ();
async fn run(
&self,
input: ReconciliationInput,
_resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<ReconciliationReport, Self::Error> {
let config = bus.get_cloned::<ReconciliationConfig>()
.unwrap_or_default();
let mut results = Vec::new();
let mut used_external: std::collections::HashSet<String> = std::collections::HashSet::new();
for internal in &input.internal {
let mut best_match: Option<(&SourceRecord, f64)> = None;
for external in &input.external {
if used_external.contains(&external.id) {
continue;
}
let score = composite_score(internal, external, &config.weights);
if let Some((_, best_score)) = best_match {
if score > best_score {
best_match = Some((external, score));
}
} else if score > 0.0 {
best_match = Some((external, score));
}
}
let (external, score, classification, reason) = match best_match {
Some((ext, score)) if score >= config.match_threshold => {
used_external.insert(ext.id.clone());
(
Some(ext.clone()),
score,
MatchClassification::Matched,
format!("스코어 {:.2} >= 임계값 {:.2}", score, config.match_threshold),
)
}
Some((ext, score)) if score >= config.ambiguous_threshold => {
(
Some(ext.clone()),
score,
MatchClassification::Ambiguous,
format!(
"스코어 {:.2}: {:.2} ~ {:.2} 사이 (모호)",
score, config.ambiguous_threshold, config.match_threshold
),
)
}
_ => (
None,
0.0,
MatchClassification::Unmatched,
"모호 임계값 이상의 매칭 없음".into(),
),
};
results.push(MatchResult {
internal: internal.clone(),
external,
score,
classification,
reason,
});
}
let matched = results.iter().filter(|r| r.classification == MatchClassification::Matched).count();
let unmatched = results.iter().filter(|r| r.classification == MatchClassification::Unmatched).count();
let ambiguous = results.iter().filter(|r| r.classification == MatchClassification::Ambiguous).count();
let total_amount_diff: rust_decimal::Decimal = results.iter()
.filter_map(|r| {
r.external.as_ref().map(|ext| (r.internal.amount - ext.amount).abs())
})
.sum();
Outcome::next(ReconciliationReport {
total_internal: input.internal.len(),
total_external: input.external.len(),
matched,
unmatched,
ambiguous,
total_amount_diff,
results,
})
}
}#5. Outcome 분류 파이프라인
대사 결과를 Ranvier Outcome으로 매핑하여 하위 분기에 활용한다.
use ranvier_runtime::Axon;
let recon_pipeline = Axon::typed::<ReconciliationInput, ReconciliationReport>("reconcile")
.then(ReconcileTransition)
.then_fn("classify-outcome", |report, _res, _bus| async move {
if report.unmatched == 0 && report.ambiguous == 0 {
// 전부 매칭 — 성공
Outcome::next(report)
} else if report.unmatched > 0 {
// 불일치 레코드 있음 — 조사 필요
Outcome::fault(format!(
"대사 미완료: {}건 불일치, {}건 모호",
report.unmatched, report.ambiguous
))
} else {
// 모호한 건만 있음 — 수동 검토 필요
Outcome::next(report)
}
});#6. 리포트 생성
대사 결과에서 JSON 및 CSV 리포트를 생성한다.
pub struct GenerateJsonReport;
#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateJsonReport {
type Error = String;
type Resources = ();
async fn run(
&self,
report: ReconciliationReport,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<String, Self::Error> {
let json = serde_json::to_string_pretty(&report)
.map_err(|e| e.to_string())?;
Outcome::next(json)
}
}
pub struct GenerateCsvReport;
#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateCsvReport {
type Error = String;
type Resources = ();
async fn run(
&self,
report: ReconciliationReport,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<String, Self::Error> {
let mut csv = String::from(
"internal_id,internal_ref,internal_amount,external_id,external_ref,external_amount,score,classification,reason\n"
);
for result in &report.results {
let (ext_id, ext_ref, ext_amount) = match &result.external {
Some(ext) => (ext.id.as_str(), ext.reference.as_str(), ext.amount.to_string()),
None => ("", "", "".to_string()),
};
csv.push_str(&format!(
"{},{},{},{},{},{},{:.2},{:?},{}\n",
result.internal.id,
result.internal.reference,
result.internal.amount,
ext_id,
ext_ref,
ext_amount,
result.score,
result.classification,
result.reason.replace(',', ";"),
));
}
Outcome::next(csv)
}
}#7. 감사 추적 연동
규제 준수를 위해 매칭 판정을 감사 해시 체인에 기록한다.
use ranvier_audit::prelude::*;
let audited_recon = Axon::typed::<ReconciliationInput, ReconciliationReport>("audited-recon")
.then(ReconcileTransition)
.then_fn("audit-results", |report, _res, bus| async move {
if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
// 요약 기록
audit.record(AuditEntry {
action: "reconciliation.completed".into(),
actor: bus.get_cloned::<super::TenantContext>()
.map(|t| t.tenant_id)
.unwrap_or_else(|_| "system".into()),
resource: format!(
"matched={}, unmatched={}, ambiguous={}, amount_diff={}",
report.matched, report.unmatched, report.ambiguous, report.total_amount_diff
),
timestamp: chrono::Utc::now(),
}).await;
// 불일치/모호 건에 대해 개별 기록 (조사 추적용)
for result in &report.results {
if result.classification != MatchClassification::Matched {
audit.record(AuditEntry {
action: format!("reconciliation.{:?}", result.classification).to_lowercase(),
actor: "system".into(),
resource: format!(
"internal={}, score={:.2}, reason={}",
result.internal.id, result.score, result.reason
),
timestamp: chrono::Utc::now(),
}).await;
}
}
}
Outcome::next(report)
});#8. 전체 서버 예제
use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ranvier::http()
.bind("0.0.0.0:8080")
.guard(RequestIdGuard::new())
.guard(AccessLogGuard::new())
.guard(AuthGuard::bearer(vec![
std::env::var("API_TOKEN")?,
]))
.post_typed("/api/reconcile", recon_pipeline())
.post_typed("/api/reconcile/csv", recon_csv_pipeline())
.run(())
.await?;
Ok(())
}
fn recon_pipeline() -> Axon<ReconciliationInput, ReconciliationReport, String, ()> {
Axon::typed::<ReconciliationInput, String>("reconcile")
.then(ReconcileTransition)
}
fn recon_csv_pipeline() -> Axon<ReconciliationInput, String, String, ()> {
Axon::typed::<ReconciliationInput, String>("reconcile-csv")
.then(ReconcileTransition)
.then(GenerateCsvReport)
}#9. 대사 테스트
#[cfg(test)]
mod tests {
use super::*;
use rust_decimal_macros::dec;
fn sample_internal() -> Vec<SourceRecord> {
vec![
SourceRecord {
id: "INT-001".into(),
reference: "INV-2026-001".into(),
amount: dec!(100.00),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
description: "월간 구독".into(),
source: "internal".into(),
},
SourceRecord {
id: "INT-002".into(),
reference: "INV-2026-002".into(),
amount: dec!(250.50),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 5).unwrap(),
description: "Enterprise 업그레이드".into(),
source: "internal".into(),
},
]
}
fn sample_external() -> Vec<SourceRecord> {
vec![
SourceRecord {
id: "EXT-001".into(),
reference: "INV-2026-001".into(),
amount: dec!(100.00),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
description: "월간 구독 결제".into(),
source: "external".into(),
},
// 퍼지 매칭 테스트를 위해 약간 다른 참조번호
SourceRecord {
id: "EXT-002".into(),
reference: "INV-2026-02".into(), // 뒤의 0 누락
amount: dec!(250.50),
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 6).unwrap(), // 1일 차이
description: "Enterprise 업그레이드".into(),
source: "external".into(),
},
]
}
#[test]
fn test_exact_match() {
let internal = &sample_internal()[0];
let external = &sample_external()[0];
assert_eq!(exact_match(internal, external), 1.0);
}
#[test]
fn test_fuzzy_match() {
let internal = &sample_internal()[1];
let external = &sample_external()[1];
let score = fuzzy_reference_match(internal, external);
assert!(score > 0.8); // 유사하지만 동일하지 않음
assert!(score < 1.0);
}
#[test]
fn test_amount_match_exact() {
let internal = &sample_internal()[0];
let external = &sample_external()[0];
assert_eq!(amount_match(internal, external), 1.0);
}
#[test]
fn test_composite_score() {
let internal = &sample_internal()[0];
let external = &sample_external()[0];
let weights = MatchWeights::default();
let score = composite_score(internal, external, &weights);
assert!(score > 0.9); // 거의 완벽한 매칭
}
#[tokio::test]
async fn test_reconciliation() {
let input = ReconciliationInput {
internal: sample_internal(),
external: sample_external(),
};
let mut bus = Bus::new();
bus.insert(ReconciliationConfig::default());
let transition = ReconcileTransition;
let result = transition.run(input, &(), &mut bus).await;
match result {
Outcome::Next(report) => {
assert_eq!(report.total_internal, 2);
assert_eq!(report.total_external, 2);
assert!(report.matched >= 1); // 최소 정확한 매칭 1건
}
_ => panic!("대사가 성공해야 함"),
}
}
#[test]
fn test_date_match() {
let d1 = SourceRecord {
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
..sample_internal()[0].clone()
};
let d2 = SourceRecord {
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
..sample_external()[0].clone()
};
assert_eq!(date_match(&d1, &d2), 1.0);
let d3 = SourceRecord {
date: chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap(),
..sample_external()[0].clone()
};
assert_eq!(date_match(&d1, &d3), 0.9);
}
}#See Also
- Saga Compensation Cookbook — 대사 실패 보상 패턴
- Multi-Tenant Isolation Cookbook — 테넌트별 대사
- Billing & Metering Cookbook — 빌링과 사용량 대사
- Guard Patterns Cookbook — 입력 검증 Guard