#Billing & Metering Cookbook

Version: 0.40.0 | Updated: 2026-03-23 | Applies to: ranvier-core 0.40+ | Category: Cookbook


#Overview

API metering is the foundation of SaaS monetization. This cookbook shows how to implement per-tenant usage counting, plan-based rate limits, overage handling, and usage reporting — all as Guard and Transition patterns using existing Ranvier primitives with Redis for distributed state.


#1. Metering Guard — Redis INCR

Count API calls per tenant per month using Redis atomic increments.

use async_trait::async_trait;
use ranvier_core::prelude::*;
use redis::AsyncCommands;

/// Bus type: tenant's plan limits.
#[derive(Debug, Clone)]
pub struct PlanLimits {
    pub monthly_api_calls: u64,
    pub monthly_tokens: u64,
    pub features: Vec<String>,
}

/// Bus type: current usage snapshot.
#[derive(Debug, Clone)]
pub struct UsageSnapshot {
    pub api_calls: u64,
    pub tokens: u64,
    pub period: String,  // "2026-03"
}

pub struct MeteringGuard<T> {
    redis: redis::Client,
    _marker: std::marker::PhantomData<T>,
}

impl<T> MeteringGuard<T> {
    pub fn new(redis_url: &str) -> Self {
        Self {
            redis: redis::Client::open(redis_url).expect("redis client"),
            _marker: std::marker::PhantomData,
        }
    }
}

#[async_trait]
impl<T: Send + Sync + 'static> Transition<T, T> for MeteringGuard<T> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        input: T,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<T, Self::Error> {
        let tenant = bus.require::<super::TenantContext>();
        let limits = bus.require::<PlanLimits>();

        let period = chrono::Utc::now().format("%Y-%m").to_string();
        let key = format!("meter:{}:{}", tenant.tenant_id, period);

        let mut conn = self.redis.get_multiplexed_async_connection().await
            .map_err(|e| e.to_string())?;

        // Atomic increment + TTL (auto-expire at end of billing period)
        let count: u64 = conn.incr(&key, 1u64).await
            .map_err(|e| e.to_string())?;

        // Set TTL on first call of the period (45 days to be safe)
        if count == 1 {
            let _: () = conn.expire(&key, 45 * 24 * 3600).await
                .map_err(|e| e.to_string())?;
        }

        // Store usage snapshot in Bus
        bus.insert(UsageSnapshot {
            api_calls: count,
            tokens: 0,
            period: period.clone(),
        });

        if count > limits.monthly_api_calls {
            Outcome::fault(format!(
                "429 Too Many Requests: {} API calls used, limit is {}. Retry-After: {}",
                count,
                limits.monthly_api_calls,
                seconds_until_next_month(),
            ))
        } else {
            Outcome::next(input)
        }
    }
}

fn seconds_until_next_month() -> u64 {
    let now = chrono::Utc::now();
    let next_month = if now.month() == 12 {
        chrono::NaiveDate::from_ymd_opt(now.year() + 1, 1, 1)
    } else {
        chrono::NaiveDate::from_ymd_opt(now.year(), now.month() + 1, 1)
    };
    next_month
        .map(|d| d.and_hms_opt(0, 0, 0).unwrap())
        .map(|dt| (dt - now.naive_utc()).num_seconds().max(0) as u64)
        .unwrap_or(86400)
}

#2. Plan Management via Bus

Inject plan limits from a database or config into the Bus.

use ranvier_core::prelude::*;
use sqlx::PgPool;

#[derive(Debug, Clone)]
pub struct DbPool(pub PgPool);

pub struct PlanResolver;

#[async_trait]
impl Transition<(), ()> for PlanResolver {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        input: (),
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<(), Self::Error> {
        let tenant = bus.require::<super::TenantContext>();
        let pool = &bus.require::<DbPool>().0;

        let row = sqlx::query_as::<_, (i64, i64, String)>(
            "SELECT monthly_api_calls, monthly_tokens, features FROM plans \
             JOIN tenants ON tenants.plan_id = plans.id \
             WHERE tenants.tenant_id = $1"
        )
        .bind(&tenant.tenant_id)
        .fetch_optional(pool)
        .await
        .map_err(|e| e.to_string())?;

        let limits = match row {
            Some((calls, tokens, features)) => PlanLimits {
                monthly_api_calls: calls as u64,
                monthly_tokens: tokens as u64,
                features: serde_json::from_str(&features).unwrap_or_default(),
            },
            None => PlanLimits {
                monthly_api_calls: 1_000,  // free tier default
                monthly_tokens: 100_000,
                features: vec![],
            },
        };

        bus.insert(limits);
        Outcome::next(input)
    }
}

#3. Overage Handling

Instead of hard-blocking, allow overage with penalty pricing.

#[derive(Debug, Clone)]
pub struct OveragePolicy {
    pub allow_overage: bool,
    pub overage_rate_per_call_cents: u64,  // e.g., 1 cent per extra call
    pub hard_cap_multiplier: f64,          // e.g., 2.0 = 200% of limit
}

pub struct OverageMeteringGuard<T> {
    redis: redis::Client,
    _marker: std::marker::PhantomData<T>,
}

#[async_trait]
impl<T: Send + Sync + 'static> Transition<T, T> for OverageMeteringGuard<T> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        input: T,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<T, Self::Error> {
        let tenant = bus.require::<super::TenantContext>();
        let limits = bus.require::<PlanLimits>();
        let overage = bus.get_cloned::<OveragePolicy>()
            .unwrap_or(OveragePolicy {
                allow_overage: false,
                overage_rate_per_call_cents: 0,
                hard_cap_multiplier: 1.0,
            });

        let period = chrono::Utc::now().format("%Y-%m").to_string();
        let key = format!("meter:{}:{}", tenant.tenant_id, period);

        let mut conn = self.redis.get_multiplexed_async_connection().await
            .map_err(|e| e.to_string())?;

        let count: u64 = conn.incr(&key, 1u64).await
            .map_err(|e| e.to_string())?;

        let hard_cap = (limits.monthly_api_calls as f64 * overage.hard_cap_multiplier) as u64;

        if count > hard_cap {
            Outcome::fault(format!(
                "429 Hard cap exceeded: {} calls (cap: {})",
                count, hard_cap
            ))
        } else if count > limits.monthly_api_calls && !overage.allow_overage {
            Outcome::fault(format!(
                "429 Plan limit exceeded: {} calls (limit: {})",
                count, limits.monthly_api_calls
            ))
        } else {
            // Track overage for billing
            if count > limits.monthly_api_calls {
                let overage_count = count - limits.monthly_api_calls;
                let overage_key = format!("overage:{}:{}", tenant.tenant_id, period);
                let _: () = conn.set(&overage_key, overage_count).await
                    .map_err(|e| e.to_string())?;

                tracing::info!(
                    tenant_id = %tenant.tenant_id,
                    overage_count = overage_count,
                    "tenant in overage zone"
                );
            }

            bus.insert(UsageSnapshot {
                api_calls: count,
                tokens: 0,
                period,
            });

            Outcome::next(input)
        }
    }
}

#4. Usage Report Generation

A Transition that generates a monthly usage report per tenant.

use ranvier_core::prelude::*;

#[derive(Debug, Clone, serde::Serialize)]
pub struct UsageReport {
    pub tenant_id: String,
    pub period: String,
    pub api_calls: u64,
    pub tokens_used: u64,
    pub plan_limit: u64,
    pub overage_calls: u64,
    pub overage_cost_cents: u64,
}

pub struct GenerateReport {
    redis: redis::Client,
}

#[async_trait]
impl Transition<String, UsageReport> for GenerateReport {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        tenant_id: String,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<UsageReport, Self::Error> {
        let period = chrono::Utc::now().format("%Y-%m").to_string();
        let meter_key = format!("meter:{}:{}", tenant_id, period);
        let overage_key = format!("overage:{}:{}", tenant_id, period);

        let mut conn = self.redis.get_multiplexed_async_connection().await
            .map_err(|e| e.to_string())?;

        let api_calls: u64 = conn.get(&meter_key).await.unwrap_or(0);
        let overage_calls: u64 = conn.get(&overage_key).await.unwrap_or(0);

        let limits = bus.get_cloned::<PlanLimits>()
            .unwrap_or(PlanLimits {
                monthly_api_calls: 1_000,
                monthly_tokens: 100_000,
                features: vec![],
            });

        let overage_rate = bus.get_cloned::<OveragePolicy>()
            .map(|o| o.overage_rate_per_call_cents)
            .unwrap_or(0);

        Outcome::next(UsageReport {
            tenant_id,
            period,
            api_calls,
            tokens_used: 0,
            plan_limit: limits.monthly_api_calls,
            overage_calls,
            overage_cost_cents: overage_calls * overage_rate,
        })
    }
}

#5. Inspector Metrics Separation

Keep business metrics (API usage, revenue) separate from operational metrics (latency, error rates) that Inspector already tracks.

use ranvier_core::prelude::*;
use ranvier_runtime::Axon;

/// Bus type: business metrics for the current request.
#[derive(Debug, Clone)]
pub struct BusinessMetrics {
    pub api_calls_remaining: u64,
    pub plan_name: String,
    pub is_overage: bool,
}

let metered_pipeline = Axon::typed::<ApiRequest, ApiResponse>("metered-api")
    .then_fn("resolve-plan", |req, _res, bus| async move {
        // PlanResolver inserts PlanLimits into Bus
        PlanResolver.run((), &(), bus).await.map_err(|e| e.to_string())?;
        Outcome::next(req)
    })
    .then_fn("meter", |req, _res, bus| async move {
        // MeteringGuard counts and enforces limits
        // After metering, compute remaining calls
        let limits = bus.require::<PlanLimits>();
        let usage = bus.get_cloned::<UsageSnapshot>();

        if let Ok(usage) = usage {
            bus.insert(BusinessMetrics {
                api_calls_remaining: limits.monthly_api_calls.saturating_sub(usage.api_calls),
                plan_name: "pro".into(),
                is_overage: usage.api_calls > limits.monthly_api_calls,
            });
        }

        Outcome::next(req)
    })
    .then_fn("process", |req, _res, bus| async move {
        // Business logic
        let metrics = bus.get_cloned::<BusinessMetrics>();
        tracing::info!(
            remaining = metrics.map(|m| m.api_calls_remaining).unwrap_or(0),
            "processing request"
        );

        Outcome::next(ApiResponse { data: "ok".into() })
    });

Add usage headers to the HTTP response:

// In a response transformer or Guard
.then_fn("add-usage-headers", |response, _res, bus| async move {
    if let Ok(usage) = bus.get_cloned::<UsageSnapshot>() {
        let limits = bus.require::<PlanLimits>();
        // These headers help clients track their usage
        // X-RateLimit-Limit: 10000
        // X-RateLimit-Remaining: 8500
        // X-RateLimit-Reset: 2026-04-01T00:00:00Z
        bus.insert(ResponseHeaders(vec![
            ("X-RateLimit-Limit".into(), limits.monthly_api_calls.to_string()),
            ("X-RateLimit-Remaining".into(),
                limits.monthly_api_calls.saturating_sub(usage.api_calls).to_string()),
        ]));
    }
    Outcome::next(response)
})

#6. Full Server Example

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let pool = sqlx::PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
    let redis_url = std::env::var("REDIS_URL").unwrap_or("redis://127.0.0.1/".into());

    Ranvier::http()
        .bind("0.0.0.0:8080")
        .bus_injector({
            let pool = pool.clone();
            move |parts: &http::request::Parts, bus: &mut Bus| {
                bus.insert(DbPool(pool.clone()));
                let headers: Vec<(String, String)> = parts.headers.iter()
                    .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
                    .collect();
                bus.insert(headers);
            }
        })
        .guard(RequestIdGuard::new())
        .guard(AccessLogGuard::new())
        .guard(CorsGuard::<()>::permissive())
        .guard(TenantGuard::new())
        .guard(MeteringGuard::new(&redis_url))
        // Routes
        .post_typed("/api/data", data_pipeline())
        .get("/api/usage", usage_pipeline())
        .run(())
        .await?;

    Ok(())
}

#7. Testing Metering

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_seconds_until_next_month() {
        let secs = seconds_until_next_month();
        assert!(secs > 0);
        assert!(secs <= 31 * 24 * 3600);
    }

    #[tokio::test]
    async fn test_plan_limits_injection() {
        let mut bus = Bus::new();
        bus.insert(PlanLimits {
            monthly_api_calls: 10_000,
            monthly_tokens: 1_000_000,
            features: vec!["advanced-analytics".into()],
        });

        let limits = bus.require::<PlanLimits>();
        assert_eq!(limits.monthly_api_calls, 10_000);
        assert!(limits.features.contains(&"advanced-analytics".into()));
    }

    #[test]
    fn test_usage_report_serialization() {
        let report = UsageReport {
            tenant_id: "tenant-123".into(),
            period: "2026-03".into(),
            api_calls: 5_000,
            tokens_used: 250_000,
            plan_limit: 10_000,
            overage_calls: 0,
            overage_cost_cents: 0,
        };

        let json = serde_json::to_string(&report).unwrap();
        assert!(json.contains("tenant-123"));
        assert!(json.contains("5000"));
    }

    #[test]
    fn test_overage_calculation() {
        let usage = 12_000u64;
        let limit = 10_000u64;
        let rate = 1u64; // 1 cent per call

        let overage = usage.saturating_sub(limit);
        assert_eq!(overage, 2_000);
        assert_eq!(overage * rate, 2_000); // $20.00
    }
}

#See Also

  • Multi-Tenant Isolation Cookbook — tenant Guard + Bus propagation
  • Guard Patterns Cookbook — Guard composition patterns
  • LLM Gateway Cookbook — token-based cost metering for LLM calls
  • Egress Patterns Cookbook — webhook delivery metering