Skip to main content

Overview

The Request Stream pattern continuously submits proof requests to the Boundless Market based on blockchain events. Use this pattern when your application proves properties about each block or block range: monitoring block hashes, tracking state transitions, or verifying computations across multiple blocks.
The Request Stream example source code can be found at: boundless/examples/request-stream

How It Works

The pattern monitors the blockchain for new blocks. Every N blocks (configurable; default is 2), it collects block hashes and constructs a proof request with them as input. The request is submitted to the Boundless Market, and the pattern waits for a prover to fulfill it before repeating for the next block range.

Setting Up

Environment Variables

You’ll need the same environment variables as a standard request:
export RPC_URL="https://..."
export PRIVATE_KEY="abcdef..."
export PINATA_JWT="abcdef..."  # or configure S3 storage
For more details on storage providers, see Storage Providers.

CLI Arguments

The request stream example accepts the following arguments:
struct Args {
    /// URL of the Ethereum RPC endpoint
    rpc_url: Url,
    /// Private key used to interact with the Boundless Market
    private_key: PrivateKeySigner,
    /// Number of blocks to include in each request
    blocks_per_request: u64,  // default: 2
    /// Storage provider configuration
    storage_config: StorageProviderConfig,
    /// Boundless Market deployment (optional)
    deployment: Option<Deployment>,
}

Creating a Block Range Stream

The core of the request stream pattern is creating an async stream that monitors the blockchain and emits events when new block ranges are ready.

Stream Pattern Benefits

Streams process events as they arrive, give consumers control over processing rate (backpressure), and compose with other stream operations like filter and map.

Implementation

async fn create_block_range_stream<P: Provider + Clone + 'static>(
    provider: P,
    blocks_per_request: u64,
) -> Result<Pin<Box<dyn Stream<Item = Result<BlockRangeEvent>> + Send>>> {
    let initial_block = provider.get_block_number().await?;
    let provider = std::sync::Arc::new(provider);
    let provider_clone = provider.clone();

    Ok(Box::pin(async_stream::stream! {
        let mut last_processed_block = initial_block;

        loop {
            let target_block = last_processed_block + blocks_per_request;

            // Poll until we reach the target block
            loop {
                let current_block = provider_clone.get_block_number().await?;
                if current_block >= target_block {
                    break;
                }
                tokio::time::sleep(Duration::from_secs(2)).await;
            }

            // Collect block hashes in the range
            let start_block = last_processed_block + 1;
            let end_block = target_block;
            let mut block_hashes = Vec::new();

            for block_num in start_block..=end_block {
                let block = provider_clone
                    .get_block_by_number(BlockNumberOrTag::Number(block_num))
                    .await?;
                block_hashes.push(block.header.hash);
            }

            yield Ok(BlockRangeEvent {
                start_block,
                end_block,
                block_hashes,
            });

            last_processed_block = end_block;
        }
    }))
}

Processing Events and Submitting Requests

Once you have a stream of block range events, you can process them and submit proof requests:

Main Processing Loop

// Create the client
let client = Client::builder()
    .with_rpc_url(args.rpc_url)
    .with_deployment(args.deployment)
    .with_storage_provider_config(&args.storage_config)?
    .with_private_key(args.private_key.clone())
    .build()
    .await?;

// Upload the program once
let program_url = client
    .storage_provider
    .as_ref()
    .unwrap()
    .upload_program(ECHO_ELF)
    .await?;

// Create the event stream
let provider = client.boundless_market.instance().provider().clone();
let mut stream = create_block_range_stream(provider, args.blocks_per_request).await?;

// Process events
while let Some(event_result) = stream.next().await {
    let event = event_result?;

    // Prepare input data
    let input = input_function(&event.block_hashes);
    let request_id = request_id_function(args.private_key.address(), event.start_block);

    // Build and submit the request
    let request = client
        .new_request()
        .with_program_url(program_url.clone())?
        .with_request_input(input)
        .with_request_id(request_id);

    let (submitted_request_id, expires_at) = client.submit(request).await?;

    // Wait for fulfillment
    let _fulfillment = client
        .wait_for_request_fulfillment(
            submitted_request_id,
            Duration::from_secs(5),
            expires_at,
        )
        .await?;
}

Input Construction

The input data must be prepared in a format that your guest program can understand. In this example, we concatenate block hashes:
fn input_function(block_hashes: &[B256]) -> RequestInput {
    let mut input = Vec::new();
    // Concatenate all block hashes into a single byte vector
    // Each hash is 32 bytes (B256)
    for hash in block_hashes {
        input.extend_from_slice(hash.as_slice());
    }
    RequestInput::builder().write_slice(&input).build_inline().unwrap()
}
In production, you might want to serialize data in a structured format (e.g., using bincode, serde) or include additional metadata. Ensure your guest program can deserialize this format.

Request IDs

A Request ID is a 256-bit value containing your address and a 32-bit index. Bits 0-31 hold the index (u32), bits 32-191 hold the requestor address (160 bits), and bits 192+ hold flags such as the smart contract signature flag.

Choosing an Index

In this example, we use the start block number as the index:
fn request_id_function(address: Address, start_block: u64) -> RequestId {
    let request_index = start_block as u32;
    RequestId::new(address, request_index)
}
Each block range receives a unique, deterministic request ID, making it straightforward to identify which block range a request corresponds to.
The index must be unique per requestor address. If you submit multiple requests with the same index, only one will be accepted.

Full Example

async fn run(args: Args) -> Result<()> {
    // Step 1: Create the Boundless client
    let client = Client::builder()
        .with_rpc_url(args.rpc_url)
        .with_deployment(args.deployment)
        .with_storage_provider_config(&args.storage_config)?
        .with_private_key(args.private_key.clone())
        .build()
        .await?;

    // Step 2: Upload the program
    let program_url = client
        .storage_provider
        .as_ref()
        .unwrap()
        .upload_program(ECHO_ELF)
        .await?;

    // Step 3: Create the event stream
    let provider = client.boundless_market.instance().provider().clone();
    let mut stream = create_block_range_stream(provider, args.blocks_per_request).await?;

    // Step 4: Process events and submit requests
    while let Some(event_result) = stream.next().await {
        let event = event_result?;

        // Prepare request data
        let input = input_function(&event.block_hashes);
        let request_id = request_id_function(args.private_key.address(), event.start_block);

        // Build the request
        let request = client
            .new_request()
            .with_program_url(program_url.clone())?
            .with_request_input(input)
            .with_request_id(request_id);

        // Submit the request
        let (request_id, expires_at) = client.submit(request).await?;

        // Wait for fulfillment
        let fulfillment = client
            .wait_for_request_fulfillment(
                request_id,
                Duration::from_secs(5),
                expires_at,
            )
            .await?;

        tracing::info!("Request fulfilled: {:?}", fulfillment);
    }

    Ok(())
}

Use Cases

This pattern suits applications that prove properties about each block or block range, verify state transitions continuously, process data in batches as new blocks arrive, or generate proofs for events as they occur onchain.

Next Steps

See request configuration for fine-tuning requests, using proofs for integrating proofs into your application, and callbacks for automatic proof delivery.