StreamingManager (Streaming Cache)

StreamingManager is a file-based streaming cache manager that does not buffer response bodies in memory. This implementation stores response metadata and body content separately, enabling memory-efficient handling of large responses.

Getting Started

The StreamingManager is built into the core http-cache crate and is available when the streaming feature is enabled.

[dependencies]
http-cache = { version = "1.0", features = ["streaming", "streaming-tokio"] }

Or for smol runtime:

[dependencies]  
http-cache = { version = "1.0", features = ["streaming", "streaming-smol"] }

Basic Usage

use http_cache::{StreamingManager, StreamingBody, HttpStreamingCache};
use std::path::PathBuf;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a file-based streaming cache manager
    let cache_dir = PathBuf::from("./streaming-cache");
    let manager = StreamingManager::new(cache_dir);
    
    // Use with streaming cache
    let cache = HttpStreamingCache::new(manager);
    
    Ok(())
}

Usage with Tower

The streaming cache manager works with Tower's HttpCacheStreamingLayer:

use http_cache::{StreamingManager, HttpCacheStreamingLayer};
use tower::{Service, ServiceExt};
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use bytes::Bytes;
use std::path::PathBuf;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create streaming cache manager
    let cache_dir = PathBuf::from("./cache");
    let manager = StreamingManager::new(cache_dir);
    
    // Create streaming cache layer
    let cache_layer = HttpCacheStreamingLayer::new(manager);
    
    // Your base service
    let service = tower::service_fn(|_req: Request<Full<Bytes>>| async {
        Ok::<_, std::convert::Infallible>(
            Response::builder()
                .status(StatusCode::OK)
                .header("cache-control", "max-age=3600")
                .body(Full::new(Bytes::from("Large response data...")))?
        )
    });
    
    // Wrap with caching
    let cached_service = cache_layer.layer(service);
    
    // Make requests
    let request = Request::builder()
        .uri("https://example.com/large-file")
        .body(Full::new(Bytes::new()))?;
        
    let response = cached_service.oneshot(request).await?;
    println!("Response status: {}", response.status());
    
    Ok(())
}

Working with the manager directly

Creating a manager

#![allow(unused)]
fn main() {
use http_cache::StreamingManager;
use std::path::PathBuf;

// Create with custom cache directory
let cache_dir = PathBuf::from("./my-streaming-cache");
let manager = StreamingManager::new(cache_dir);
}

Streaming Cache Operations

Caching a streaming response

#![allow(unused)]
fn main() {
use http_cache::StreamingManager;
use http::{Request, Response, StatusCode};
use http_body_util::Full;
use bytes::Bytes;
use http_cache_semantics::CachePolicy;
use url::Url;

let manager = StreamingManager::new(PathBuf::from("./cache"));

// Create a large response to cache
let large_data = vec![b'X'; 10_000_000]; // 10MB response
let response = Response::builder()
    .status(StatusCode::OK)
    .header("cache-control", "max-age=3600, public")
    .header("content-type", "application/octet-stream")
    .body(Full::new(Bytes::from(large_data)))?;

// Create cache policy
let request = Request::builder()
    .method("GET")
    .uri("https://example.com/large-file")
    .body(())?;
let policy = CachePolicy::new(&request, &Response::builder()
    .status(200)
    .header("cache-control", "max-age=3600, public")
    .body(vec![])?);

// Cache the response (content stored to disk, metadata separate)
let url = Url::parse("https://example.com/large-file")?;
let cached_response = manager.put(
    "GET:https://example.com/large-file".to_string(),
    response,
    policy,
    url,
).await?;

println!("Cached response without loading into memory!");
}

Retrieving a streaming response

#![allow(unused)]
fn main() {
// Retrieve from cache - returns a streaming body
let cached = manager.get("GET:https://example.com/large-file").await?;

if let Some((response, policy)) = cached {
    println!("Cache hit! Status: {}", response.status());
    
    // The response body streams directly from disk
    let body = response.into_body();
    
    // Process the streaming body without loading it all into memory
    let mut body_stream = std::pin::pin!(body);
    while let Some(frame_result) = body_stream.frame().await {
        let frame = frame_result?;
        if let Some(chunk) = frame.data_ref() {
            // Process chunk without accumulating in memory
            println!("Processing chunk of {} bytes", chunk.len());
        }
    }
} else {
    println!("Cache miss");
}
}

Deleting cached entries

#![allow(unused)]
fn main() {
// Remove from cache (deletes both metadata and content files)
manager.delete("GET:https://example.com/large-file").await?;
}

Storage Structure

The StreamingManager organizes cache files as follows:

cache-directory/
├── cache-v2/
│   ├── metadata/
│   │   ├── 1a2b3c4d....json  # Response metadata (headers, status, policy)
│   │   └── 5e6f7g8h....json
│   └── content/
│       ├── sha256_hash1      # Raw response body content
│       └── sha256_hash2
  • Metadata files: JSON files containing response status, headers, cache policy, and content digest
  • Content files: Raw binary content files identified by SHA256 hash for deduplication
  • Content-addressable: Identical content is stored only once regardless of URL

Performance Characteristics

Memory Usage

  • Constant memory usage regardless of response size
  • Only metadata loaded into memory (~few KB per response)
  • Response bodies stream directly from disk files

Disk Usage

  • Content deduplication via SHA256 hashing
  • Efficient storage with separate metadata and content
  • Persistent cache survives application restarts

Use Cases

  • Large file responses (images, videos, archives)
  • Memory-constrained environments
  • High-throughput applications with large responses
  • Long-running services that need persistent caching

Comparison with Other Managers

ManagerMemory UsageStorageStreamingBest For
StreamingManagerConstantDiskYesLarge responses, memory efficiency
CACacheManagerBuffers responsesDiskNoGeneral purpose, moderate sizes
MokaManagerBuffers responsesMemoryNoFast access, small responses
QuickManagerBuffers responsesMemoryNoLow overhead, small responses

Configuration

The StreamingManager uses sensible defaults but can be configured through environment:

#![allow(unused)]
fn main() {
// Cache directory structure is automatically created
let manager = StreamingManager::new(PathBuf::from("./cache"));

// The manager handles:
// - Directory creation
// - Content deduplication  
// - Metadata organization
// - File cleanup on delete
}

For advanced configuration, you can implement custom cleanup policies or directory management by extending the manager.