StreamingManager (Streaming Cache)

StreamingManager is a streaming cache manager that combines cacache for disk storage with moka for metadata tracking and TinyLFU eviction.

Key Features

  • True streaming on read: Cached responses are streamed from disk in 64KB chunks, not loaded fully into memory
  • TinyLFU eviction: Better hit rates than simple LRU by filtering out one-hit wonders
  • Content deduplication: Automatic via cacache's content-addressed storage
  • Integrity verification: Cached data is verified on read
  • Body size limits: Configurable maximum body size to prevent memory exhaustion
  • Backpressure handling: Eviction cleanup uses bounded channels to prevent task accumulation

Important: Write-Path Buffering

While cached responses are streamed on read (GET), the write path (PUT) requires buffering the entire response body in memory. This is necessary to:

  • Compute the content hash for cacache's content-addressed storage
  • Enable content deduplication

For very large responses, configure the max_body_size limit to prevent OOM. Memory usage during PUT is O(response_size), not O(buffer_size). The default limit is 100MB.

Getting Started

The StreamingManager is built into the core http-cache crate and is available when the streaming feature is enabled.

[dependencies]
http-cache = { version = "1.0", features = ["streaming"] }

Basic Usage

use http_cache::StreamingManager;
use std::path::PathBuf;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a streaming cache manager with disk storage
    let manager = StreamingManager::new(
        PathBuf::from("./cache"),  // Cache directory
        10_000,                     // Max entries
    ).await?;

    // Or use with_temp_dir() for testing (uses temp directory)
    let test_manager = StreamingManager::with_temp_dir(1000).await?;

    // For custom body size limits (e.g., 50MB max)
    let custom_manager = StreamingManager::with_max_body_size(
        PathBuf::from("./cache"),
        10_000,
        50 * 1024 * 1024,
    ).await?;

    Ok(())
}

Architecture

┌─────────────────────────────────────────────────────────────────┐
│  moka::Cache<String, CacheMetadata>  (in-memory)               │
│  - Tracks: key → {content_hash, policy, headers, size}         │
│  - TinyLFU eviction (better hit rates than LRU)                │
│  - Eviction listener sends to bounded cleanup channel          │
└─────────────────────────────────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│  cacache (disk)                                                 │
│  - Content-addressed storage (automatic deduplication)         │
│  - Streaming reads via AsyncRead (64KB chunks)                 │
│  - Integrity verification built-in                              │
└─────────────────────────────────────────────────────────────────┘

Memory Efficiency

On cache hit (GET): Only ~64KB is held in memory at a time (the streaming buffer), regardless of response size:

Response SizePeak Memory (Buffered)Peak Memory (Streaming GET)
100KB100KB~64KB
1MB1MB~64KB
10MB10MB~64KB
100MB100MB~64KB

On cache write (PUT): The entire response body is buffered in memory to compute the content hash. This is limited by max_body_size (default: 100MB) to prevent memory exhaustion.

Usage with Tower

use http_cache::StreamingManager;
use http_cache_tower::HttpCacheStreamingLayer;
use tower::{Service, ServiceExt};
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use bytes::Bytes;
use std::path::PathBuf;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create streaming cache manager
    let manager = StreamingManager::new(
        PathBuf::from("./cache"),
        10_000,
    ).await?;

    // Create streaming cache layer
    let cache_layer = HttpCacheStreamingLayer::new(manager);

    // Your base service
    let service = tower::service_fn(|_req: Request<Full<Bytes>>| async {
        Ok::<_, std::convert::Infallible>(
            Response::builder()
                .status(StatusCode::OK)
                .header("cache-control", "max-age=3600")
                .body(Full::new(Bytes::from("Response data...")))?
        )
    });

    // Wrap with caching
    let cached_service = cache_layer.layer(service);

    // Make requests
    let request = Request::builder()
        .uri("https://example.com/api")
        .body(Full::new(Bytes::new()))?;

    let response = cached_service.oneshot(request).await?;
    println!("Response status: {}", response.status());

    Ok(())
}

Usage with Reqwest

use http_cache::StreamingManager;
use http_cache_reqwest::{StreamingCache, CacheMode};
use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use std::path::PathBuf;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = StreamingManager::new(
        PathBuf::from("./cache"),
        10_000,
    ).await?;

    let client = ClientBuilder::new(Client::new())
        .with(StreamingCache::new(manager, CacheMode::Default))
        .build();

    let response = client
        .get("https://httpbin.org/get")
        .send()
        .await?;

    println!("Status: {}", response.status());
    Ok(())
}

Working with the manager directly

Caching a response

#![allow(unused)]
fn main() {
use http_cache::{StreamingManager, StreamingCacheManager};
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use bytes::Bytes;
use http_cache_semantics::CachePolicy;
use url::Url;
use std::path::PathBuf;

let manager = StreamingManager::new(PathBuf::from("./cache"), 10_000).await?;

// Create a response to cache
let response = Response::builder()
    .status(StatusCode::OK)
    .header("cache-control", "max-age=3600, public")
    .header("content-type", "application/json")
    .body(Full::new(Bytes::from(r#"{"data": "example"}"#)))?;

// Create cache policy
let request = Request::builder()
    .method("GET")
    .uri("https://example.com/api")
    .body(())?;
let policy = CachePolicy::new(&request, &Response::builder()
    .status(200)
    .header("cache-control", "max-age=3600, public")
    .body(vec![])?);

// Cache the response
let url = Url::parse("https://example.com/api")?;
let cached_response = manager.put(
    "GET:https://example.com/api".to_string(),
    response,
    policy,
    url,
    None, // optional metadata
).await?;
}

Retrieving a cached response

#![allow(unused)]
fn main() {
// Retrieve from cache - body is streamed from disk!
let cached = manager.get("GET:https://example.com/api").await?;

if let Some((response, policy)) = cached {
    println!("Cache hit! Status: {}", response.status());

    // The response body streams from disk in 8KB chunks
    // Memory usage stays constant regardless of body size
    use http_body_util::BodyExt;
    let body = response.into_body();
    let bytes = body.collect().await?.to_bytes();
    println!("Body: {} bytes", bytes.len());
} else {
    println!("Cache miss");
}
}

Deleting cached entries

#![allow(unused)]
fn main() {
// Remove from cache
manager.delete("GET:https://example.com/api").await?;
}

Cache management

#![allow(unused)]
fn main() {
// Get the number of entries
let count = manager.entry_count();

// Clear all entries
manager.clear().await?;

// Run pending maintenance tasks
manager.run_pending_tasks().await;
}

Content Deduplication

The cacache backend automatically deduplicates content. If two different URLs return the same response body, it's only stored once on disk:

Request 1: GET /api/users → 1MB response (hash: abc123)
  └─ metadata/key1.json → points to content/abc123

Request 2: GET /api/users?v=2 → Same 1MB response (hash: abc123)
  └─ metadata/key2.json → points to content/abc123 (same file!)

Storage: 1MB (not 2MB)

Comparison with Buffered Caching

AspectCACacheManager (Buffered)StreamingManager
Memory on GETFull body in memory~64KB streaming buffer
Memory on PUTFull body in memoryFull body in memory (limited by max_body_size)
EvictionManual/NoneTinyLFU (automatic)
Content DedupYes (cacache)Yes (cacache)
Large responsesMay OOMConfigurable limit, streaming on read
Body size limitNoneConfigurable (default 100MB)
Use caseSmall responsesLarge responses