#데이터 대사 Cookbook

버전: 0.40.0 | 업데이트: 2026-03-23 | 적용 대상: ranvier-core 0.40+ | 카테고리: 쿡북


#Overview

데이터 대사 — 두 개 이상의 소스에서 레코드를 매칭하여 불일치를 식별하는 작업 — 는 금융, 결제, 재고 관리의 기본이다. Ranvier의 Outcome 모델 (Matched / Unmatched / Ambiguous)은 대사 결과에 직접 매핑되며, Axon 파이프라인은 매칭-분류-리포트 워크플로우를 자연스럽게 모델링한다.

이 쿡북은 매칭 전략, 가중치 스코어링, Outcome 분류, 리포트 생성, 감사 추적 연동을 다룬다.


#1. 대사 데이터 타입

소스 레코드와 매칭 결과 타입을 정의한다.

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceRecord {
    pub id: String,
    pub reference: String,
    pub amount: rust_decimal::Decimal,
    pub date: chrono::NaiveDate,
    pub description: String,
    pub source: String,  // "internal" 또는 "external"
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MatchResult {
    pub internal: SourceRecord,
    pub external: Option<SourceRecord>,
    pub score: f64,           // 0.0 - 1.0
    pub classification: MatchClassification,
    pub reason: String,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum MatchClassification {
    Matched,     // 일치
    Unmatched,   // 불일치
    Ambiguous,   // 모호
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReconciliationReport {
    pub total_internal: usize,
    pub total_external: usize,
    pub matched: usize,
    pub unmatched: usize,
    pub ambiguous: usize,
    pub total_amount_diff: rust_decimal::Decimal,
    pub results: Vec<MatchResult>,
}

#2. 퍼지 매칭 전략

#정확한 참조번호 매칭

fn exact_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    if internal.reference == external.reference {
        1.0
    } else {
        0.0
    }
}

#문자열 유사도 (strsim)

use strsim::{jaro_winkler, normalized_levenshtein};

fn fuzzy_reference_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    let jw = jaro_winkler(&internal.reference, &external.reference);
    let lev = normalized_levenshtein(&internal.reference, &external.reference);
    // 두 지표의 평균
    (jw + lev) / 2.0
}

#금액 허용 오차

use rust_decimal::Decimal;
use rust_decimal_macros::dec;

fn amount_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    let diff = (internal.amount - external.amount).abs();
    let tolerance = dec!(0.01); // $0.01 허용 오차

    if diff <= tolerance {
        1.0
    } else if diff <= dec!(1.00) {
        // 소액 차이에 대한 부분 매칭
        1.0 - (diff / dec!(1.00)).to_f64().unwrap_or(1.0)
    } else {
        0.0
    }
}

#날짜 근접도

fn date_match(internal: &SourceRecord, external: &SourceRecord) -> f64 {
    let diff = (internal.date - external.date).num_days().abs();
    match diff {
        0 => 1.0,
        1 => 0.9,
        2 => 0.7,
        3..=7 => 0.3,
        _ => 0.0,
    }
}

#3. 가중치 복합 스코어링

설정 가능한 가중치로 여러 매칭 기준을 결합한다.

#[derive(Debug, Clone)]
pub struct MatchWeights {
    pub reference: f64,   // 0.0 - 1.0
    pub amount: f64,
    pub date: f64,
    pub description: f64,
}

impl Default for MatchWeights {
    fn default() -> Self {
        Self {
            reference: 0.40,
            amount: 0.30,
            date: 0.20,
            description: 0.10,
        }
    }
}

fn composite_score(
    internal: &SourceRecord,
    external: &SourceRecord,
    weights: &MatchWeights,
) -> f64 {
    let ref_score = fuzzy_reference_match(internal, external);
    let amt_score = amount_match(internal, external);
    let date_score = date_match(internal, external);
    let desc_score = strsim::jaro_winkler(&internal.description, &external.description);

    ref_score * weights.reference
        + amt_score * weights.amount
        + date_score * weights.date
        + desc_score * weights.description
}

#4. 대사 Transition

핵심 매칭 Transition: 각 내부 레코드에 대해 최적의 외부 매칭을 찾는다.

use async_trait::async_trait;
use ranvier_core::prelude::*;

#[derive(Debug, Clone)]
pub struct ReconciliationInput {
    pub internal: Vec<SourceRecord>,
    pub external: Vec<SourceRecord>,
}

/// Bus 타입: 매칭 설정.
#[derive(Debug, Clone)]
pub struct ReconciliationConfig {
    pub weights: MatchWeights,
    pub match_threshold: f64,      // 이 값 이상 = Matched (기본: 0.85)
    pub ambiguous_threshold: f64,  // 이 값 이상이지만 match 미만 = Ambiguous (기본: 0.60)
}

impl Default for ReconciliationConfig {
    fn default() -> Self {
        Self {
            weights: MatchWeights::default(),
            match_threshold: 0.85,
            ambiguous_threshold: 0.60,
        }
    }
}

pub struct ReconcileTransition;

#[async_trait]
impl Transition<ReconciliationInput, ReconciliationReport> for ReconcileTransition {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        input: ReconciliationInput,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<ReconciliationReport, Self::Error> {
        let config = bus.get_cloned::<ReconciliationConfig>()
            .unwrap_or_default();

        let mut results = Vec::new();
        let mut used_external: std::collections::HashSet<String> = std::collections::HashSet::new();

        for internal in &input.internal {
            let mut best_match: Option<(&SourceRecord, f64)> = None;

            for external in &input.external {
                if used_external.contains(&external.id) {
                    continue;
                }

                let score = composite_score(internal, external, &config.weights);

                if let Some((_, best_score)) = best_match {
                    if score > best_score {
                        best_match = Some((external, score));
                    }
                } else if score > 0.0 {
                    best_match = Some((external, score));
                }
            }

            let (external, score, classification, reason) = match best_match {
                Some((ext, score)) if score >= config.match_threshold => {
                    used_external.insert(ext.id.clone());
                    (
                        Some(ext.clone()),
                        score,
                        MatchClassification::Matched,
                        format!("스코어 {:.2} >= 임계값 {:.2}", score, config.match_threshold),
                    )
                }
                Some((ext, score)) if score >= config.ambiguous_threshold => {
                    (
                        Some(ext.clone()),
                        score,
                        MatchClassification::Ambiguous,
                        format!(
                            "스코어 {:.2}: {:.2} ~ {:.2} 사이 (모호)",
                            score, config.ambiguous_threshold, config.match_threshold
                        ),
                    )
                }
                _ => (
                    None,
                    0.0,
                    MatchClassification::Unmatched,
                    "모호 임계값 이상의 매칭 없음".into(),
                ),
            };

            results.push(MatchResult {
                internal: internal.clone(),
                external,
                score,
                classification,
                reason,
            });
        }

        let matched = results.iter().filter(|r| r.classification == MatchClassification::Matched).count();
        let unmatched = results.iter().filter(|r| r.classification == MatchClassification::Unmatched).count();
        let ambiguous = results.iter().filter(|r| r.classification == MatchClassification::Ambiguous).count();

        let total_amount_diff: rust_decimal::Decimal = results.iter()
            .filter_map(|r| {
                r.external.as_ref().map(|ext| (r.internal.amount - ext.amount).abs())
            })
            .sum();

        Outcome::next(ReconciliationReport {
            total_internal: input.internal.len(),
            total_external: input.external.len(),
            matched,
            unmatched,
            ambiguous,
            total_amount_diff,
            results,
        })
    }
}

#5. Outcome 분류 파이프라인

대사 결과를 Ranvier Outcome으로 매핑하여 하위 분기에 활용한다.

use ranvier_runtime::Axon;

let recon_pipeline = Axon::typed::<ReconciliationInput, ReconciliationReport>("reconcile")
    .then(ReconcileTransition)
    .then_fn("classify-outcome", |report, _res, _bus| async move {
        if report.unmatched == 0 && report.ambiguous == 0 {
            // 전부 매칭 — 성공
            Outcome::next(report)
        } else if report.unmatched > 0 {
            // 불일치 레코드 있음 — 조사 필요
            Outcome::fault(format!(
                "대사 미완료: {}건 불일치, {}건 모호",
                report.unmatched, report.ambiguous
            ))
        } else {
            // 모호한 건만 있음 — 수동 검토 필요
            Outcome::next(report)
        }
    });

#6. 리포트 생성

대사 결과에서 JSON 및 CSV 리포트를 생성한다.

pub struct GenerateJsonReport;

#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateJsonReport {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        report: ReconciliationReport,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<String, Self::Error> {
        let json = serde_json::to_string_pretty(&report)
            .map_err(|e| e.to_string())?;
        Outcome::next(json)
    }
}

pub struct GenerateCsvReport;

#[async_trait]
impl Transition<ReconciliationReport, String> for GenerateCsvReport {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        report: ReconciliationReport,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<String, Self::Error> {
        let mut csv = String::from(
            "internal_id,internal_ref,internal_amount,external_id,external_ref,external_amount,score,classification,reason\n"
        );

        for result in &report.results {
            let (ext_id, ext_ref, ext_amount) = match &result.external {
                Some(ext) => (ext.id.as_str(), ext.reference.as_str(), ext.amount.to_string()),
                None => ("", "", "".to_string()),
            };

            csv.push_str(&format!(
                "{},{},{},{},{},{},{:.2},{:?},{}\n",
                result.internal.id,
                result.internal.reference,
                result.internal.amount,
                ext_id,
                ext_ref,
                ext_amount,
                result.score,
                result.classification,
                result.reason.replace(',', ";"),
            ));
        }

        Outcome::next(csv)
    }
}

#7. 감사 추적 연동

규제 준수를 위해 매칭 판정을 감사 해시 체인에 기록한다.

use ranvier_audit::prelude::*;

let audited_recon = Axon::typed::<ReconciliationInput, ReconciliationReport>("audited-recon")
    .then(ReconcileTransition)
    .then_fn("audit-results", |report, _res, bus| async move {
        if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
            // 요약 기록
            audit.record(AuditEntry {
                action: "reconciliation.completed".into(),
                actor: bus.get_cloned::<super::TenantContext>()
                    .map(|t| t.tenant_id)
                    .unwrap_or_else(|_| "system".into()),
                resource: format!(
                    "matched={}, unmatched={}, ambiguous={}, amount_diff={}",
                    report.matched, report.unmatched, report.ambiguous, report.total_amount_diff
                ),
                timestamp: chrono::Utc::now(),
            }).await;

            // 불일치/모호 건에 대해 개별 기록 (조사 추적용)
            for result in &report.results {
                if result.classification != MatchClassification::Matched {
                    audit.record(AuditEntry {
                        action: format!("reconciliation.{:?}", result.classification).to_lowercase(),
                        actor: "system".into(),
                        resource: format!(
                            "internal={}, score={:.2}, reason={}",
                            result.internal.id, result.score, result.reason
                        ),
                        timestamp: chrono::Utc::now(),
                    }).await;
                }
            }
        }

        Outcome::next(report)
    });

#8. 전체 서버 예제

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ranvier::http()
        .bind("0.0.0.0:8080")
        .guard(RequestIdGuard::new())
        .guard(AccessLogGuard::new())
        .guard(AuthGuard::bearer(vec![
            std::env::var("API_TOKEN")?,
        ]))
        .post_typed("/api/reconcile", recon_pipeline())
        .post_typed("/api/reconcile/csv", recon_csv_pipeline())
        .run(())
        .await?;

    Ok(())
}

fn recon_pipeline() -> Axon<ReconciliationInput, ReconciliationReport, String, ()> {
    Axon::typed::<ReconciliationInput, String>("reconcile")
        .then(ReconcileTransition)
}

fn recon_csv_pipeline() -> Axon<ReconciliationInput, String, String, ()> {
    Axon::typed::<ReconciliationInput, String>("reconcile-csv")
        .then(ReconcileTransition)
        .then(GenerateCsvReport)
}

#9. 대사 테스트

#[cfg(test)]
mod tests {
    use super::*;
    use rust_decimal_macros::dec;

    fn sample_internal() -> Vec<SourceRecord> {
        vec![
            SourceRecord {
                id: "INT-001".into(),
                reference: "INV-2026-001".into(),
                amount: dec!(100.00),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
                description: "월간 구독".into(),
                source: "internal".into(),
            },
            SourceRecord {
                id: "INT-002".into(),
                reference: "INV-2026-002".into(),
                amount: dec!(250.50),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 5).unwrap(),
                description: "Enterprise 업그레이드".into(),
                source: "internal".into(),
            },
        ]
    }

    fn sample_external() -> Vec<SourceRecord> {
        vec![
            SourceRecord {
                id: "EXT-001".into(),
                reference: "INV-2026-001".into(),
                amount: dec!(100.00),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
                description: "월간 구독 결제".into(),
                source: "external".into(),
            },
            // 퍼지 매칭 테스트를 위해 약간 다른 참조번호
            SourceRecord {
                id: "EXT-002".into(),
                reference: "INV-2026-02".into(),  // 뒤의 0 누락
                amount: dec!(250.50),
                date: chrono::NaiveDate::from_ymd_opt(2026, 3, 6).unwrap(),  // 1일 차이
                description: "Enterprise 업그레이드".into(),
                source: "external".into(),
            },
        ]
    }

    #[test]
    fn test_exact_match() {
        let internal = &sample_internal()[0];
        let external = &sample_external()[0];
        assert_eq!(exact_match(internal, external), 1.0);
    }

    #[test]
    fn test_fuzzy_match() {
        let internal = &sample_internal()[1];
        let external = &sample_external()[1];
        let score = fuzzy_reference_match(internal, external);
        assert!(score > 0.8);  // 유사하지만 동일하지 않음
        assert!(score < 1.0);
    }

    #[test]
    fn test_amount_match_exact() {
        let internal = &sample_internal()[0];
        let external = &sample_external()[0];
        assert_eq!(amount_match(internal, external), 1.0);
    }

    #[test]
    fn test_composite_score() {
        let internal = &sample_internal()[0];
        let external = &sample_external()[0];
        let weights = MatchWeights::default();
        let score = composite_score(internal, external, &weights);
        assert!(score > 0.9);  // 거의 완벽한 매칭
    }

    #[tokio::test]
    async fn test_reconciliation() {
        let input = ReconciliationInput {
            internal: sample_internal(),
            external: sample_external(),
        };

        let mut bus = Bus::new();
        bus.insert(ReconciliationConfig::default());

        let transition = ReconcileTransition;
        let result = transition.run(input, &(), &mut bus).await;

        match result {
            Outcome::Next(report) => {
                assert_eq!(report.total_internal, 2);
                assert_eq!(report.total_external, 2);
                assert!(report.matched >= 1);  // 최소 정확한 매칭 1건
            }
            _ => panic!("대사가 성공해야 함"),
        }
    }

    #[test]
    fn test_date_match() {
        let d1 = SourceRecord {
            date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
            ..sample_internal()[0].clone()
        };
        let d2 = SourceRecord {
            date: chrono::NaiveDate::from_ymd_opt(2026, 3, 1).unwrap(),
            ..sample_external()[0].clone()
        };
        assert_eq!(date_match(&d1, &d2), 1.0);

        let d3 = SourceRecord {
            date: chrono::NaiveDate::from_ymd_opt(2026, 3, 2).unwrap(),
            ..sample_external()[0].clone()
        };
        assert_eq!(date_match(&d1, &d3), 0.9);
    }
}

#See Also

  • Saga Compensation Cookbook — 대사 실패 보상 패턴
  • Multi-Tenant Isolation Cookbook — 테넌트별 대사
  • Billing & Metering Cookbook — 빌링과 사용량 대사
  • Guard Patterns Cookbook — 입력 검증 Guard