StreamingManager (Streaming Cache)

StreamingManager is a file-based streaming cache manager that does not buffer response bodies in memory. This implementation stores response metadata and body content separately, enabling memory-efficient handling of large responses.

Getting Started

The StreamingManager is built into the core http-cache crate and is available when the streaming feature is enabled.

[dependencies]
http-cache = { version = "1.0", features = ["streaming", "streaming-tokio"] }

Or for smol runtime:

[dependencies]  
http-cache = { version = "1.0", features = ["streaming", "streaming-smol"] }

Basic Usage

use http_cache::{StreamingManager, StreamingBody, HttpStreamingCache};
use std::path::PathBuf;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a file-based streaming cache manager
    let cache_dir = PathBuf::from("./streaming-cache");
    let manager = StreamingManager::new(cache_dir);
    
    // Use with streaming cache
    let cache = HttpStreamingCache::new(manager);
    
    Ok(())
}

Usage with Tower

The streaming cache manager works with Tower's HttpCacheStreamingLayer:

use http_cache::{StreamingManager, HttpCacheStreamingLayer};
use tower::{Service, ServiceExt};
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use bytes::Bytes;
use std::path::PathBuf;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create streaming cache manager
    let cache_dir = PathBuf::from("./cache");
    let manager = StreamingManager::new(cache_dir);
    
    // Create streaming cache layer
    let cache_layer = HttpCacheStreamingLayer::new(manager);
    
    // Your base service
    let service = tower::service_fn(|_req: Request<Full<Bytes>>| async {
        Ok::<_, std::convert::Infallible>(
            Response::builder()
                .status(StatusCode::OK)
                .header("cache-control", "max-age=3600")
                .body(Full::new(Bytes::from("Large response data...")))?
        )
    });
    
    // Wrap with caching
    let cached_service = cache_layer.layer(service);
    
    // Make requests
    let request = Request::builder()
        .uri("https://example.com/large-file")
        .body(Full::new(Bytes::new()))?;
        
    let response = cached_service.oneshot(request).await?;
    println!("Response status: {}", response.status());
    
    Ok(())
}

Working with the manager directly

Creating a manager

#![allow(unused)]
fn main() {
use http_cache::StreamingManager;
use std::path::PathBuf;

// Create with custom cache directory
let cache_dir = PathBuf::from("./my-streaming-cache");
let manager = StreamingManager::new(cache_dir);
}

Streaming Cache Operations

Caching a streaming response

#![allow(unused)]
fn main() {
use http_cache::StreamingManager;
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use bytes::Bytes;
use http_cache_semantics::CachePolicy;
use url::Url;

let manager = StreamingManager::new(PathBuf::from("./cache"));

// Create a large response to cache
let large_data = vec![b'X'; 10_000_000]; // 10MB response
let response = Response::builder()
    .status(StatusCode::OK)
    .header("cache-control", "max-age=3600, public")
    .header("content-type", "application/octet-stream")
    .body(Full::new(Bytes::from(large_data)))?;

// Create cache policy
let request = Request::builder()
    .method("GET")
    .uri("https://example.com/large-file")
    .body(())?;
let policy = CachePolicy::new(&request, &Response::builder()
    .status(200)
    .header("cache-control", "max-age=3600, public")
    .body(vec![])?);

// Cache the response (content stored to disk, metadata separate)
let url = Url::parse("https://example.com/large-file")?;
let cached_response = manager.put(
    "GET:https://example.com/large-file".to_string(),
    response,
    policy,
    url,
).await?;

println!("Cached response without loading into memory!");
}

Retrieving a streaming response

#![allow(unused)]
fn main() {
// Retrieve from cache - returns a streaming body
let cached = manager.get("GET:https://example.com/large-file").await?;

if let Some((response, policy)) = cached {
    println!("Cache hit! Status: {}", response.status());
    
    // The response body streams directly from disk
    let body = response.into_body();
    
    // Process the streaming body without loading it all into memory
    let mut body_stream = std::pin::pin!(body);
    while let Some(frame_result) = body_stream.frame().await {
        let frame = frame_result?;
        if let Some(chunk) = frame.data_ref() {
            // Process chunk without accumulating in memory
            println!("Processing chunk of {} bytes", chunk.len());
        }
    }
} else {
    println!("Cache miss");
}
}

Deleting cached entries

#![allow(unused)]
fn main() {
// Remove from cache (deletes both metadata and content files)
manager.delete("GET:https://example.com/large-file").await?;
}

Storage Structure

The StreamingManager organizes cache files as follows:

cache-directory/
├── cache-v2/
│   ├── metadata/
│   │   ├── 1a2b3c4d....json  # Response metadata (headers, status, policy)
│   │   └── 5e6f7g8h....json
│   └── content/
│       ├── blake3_hash1      # Raw response body content
│       └── blake3_hash2
  • Metadata files: JSON files containing response status, headers, cache policy, and content digest
  • Content files: Raw binary content files identified by Blake3 hash for deduplication
  • Content-addressable: Identical content is stored only once regardless of URL

Configuration

The StreamingManager supports basic configuration through StreamingCacheConfig:

#![allow(unused)]
fn main() {
use http_cache::{StreamingManager, StreamingCacheConfig};
use std::path::PathBuf;

// Create with default configuration
let manager = StreamingManager::new(PathBuf::from("./cache"));

// Or create with custom configuration
let config = StreamingCacheConfig {
    max_cache_size: Some(1024 * 1024 * 1024), // 1GB limit
    max_entries: Some(10000), // Maximum 10k cached entries
    streaming_buffer_size: 16384, // 16KB streaming buffer
};
let manager = StreamingManager::new_with_config(PathBuf::from("./cache"), config);

// For existing cache directories, use this to rebuild reference counts
let manager = StreamingManager::new_with_existing_cache_and_config(
    PathBuf::from("./cache"),
    config
).await?;
}

Configuration Options

  • max_cache_size: Optional maximum cache size in bytes. When exceeded, least recently used entries are evicted.
  • max_entries: Optional maximum number of cached entries. When exceeded, LRU eviction occurs.
  • streaming_buffer_size: Buffer size in bytes for streaming operations (default: 8192).