Streaming
Streams provide continuous data flow from Rust to target languages. Unlike async functions that return a single value, streams deliver multiple values over time. This enables real-time updates, sensor data, event buses, and live feeds. BoltFFI generates native stream types that integrate with each platform’s concurrency model, and handles the underlying buffering and synchronization.
The ffi_stream Attribute
Mark a method with #[ffi_stream] to expose it as a stream. The attribute requires an item parameter specifying the type of data that flows through the stream, and accepts an optional mode parameter to control how consumers receive events.
#[ffi_stream(item = Reading)] // item type is Reading, async mode (default)
#[ffi_stream(item = Reading, mode = "async")] // explicit async mode
#[ffi_stream(item = Reading, mode = "callback")] // callback mode
#[ffi_stream(item = Reading, mode = "batch")] // batch mode
The item type must be a type that can cross the FFI boundary - primitives, records marked with #[data], or other supported types. The method must return Arc<EventSubscription<T>> where T matches the item type.
Stream Modes
BoltFFI supports three consumption modes. Each mode generates different bindings suited to different use cases.
Async Mode
The default mode generates AsyncStream in Swift and Flow in Kotlin. Use this when consumers want to process events using native async iteration. The stream integrates with structured concurrency and cancellation propagates automatically.
Callback Mode
Generates a method that takes a callback and returns a cancellable handle. Use this for simple event handlers where you don’t need async iteration, or when integrating with callback-based APIs.
Batch Mode
Generates a subscription object that lets consumers pull batches of events on their own schedule. Use this for high-frequency data where you want control over when and how many events to process, or when you need to process events on a specific thread.
Creating Streams
Streams are created using EventSubscription or StreamProducer. The choice depends on whether you need single or multiple subscribers.
EventSubscription
Use EventSubscription when each call to the stream method should create an independent subscription. Each subscriber gets its own buffer and receives all events pushed after subscribing.
StreamProducer
Use StreamProducer when you need to broadcast events to multiple subscribers. Each subscriber gets its own buffer, and pushing an event delivers it to all active subscribers.
Buffer Capacity
Each subscription has a ring buffer that holds events until the consumer processes them. The default capacity is 256 items. For high-frequency streams, increase the capacity to avoid dropping events when the consumer falls behind.
// Default capacity (256)
EventSubscription::new(256)
// Larger buffer for high-frequency data
EventSubscription::new(4096)
// StreamProducer with custom capacity
StreamProducer::new(4096)
When the buffer is full, new events are dropped. The producer continues without blocking. If your consumer can’t keep up, either increase the buffer size or ensure the consumer processes events faster.
Stopping Streams
Streams can be stopped from either side. The producer can complete the stream, or the consumer can cancel their subscription.
Producer-side completion
Call unsubscribe() on the subscription to signal that no more events will be sent. Active consumers receive a completion signal.
impl Sensor {
pub fn stop(&self) {
self.subscription.unsubscribe();
}
}
Consumer-side cancellation
In async mode, cancelling the task or breaking out of the loop cancels the subscription. In callback mode, call cancel() on the returned handle.
How It Works
Streams use a continuation-based polling mechanism similar to async functions. Each subscription has a lock-free ring buffer for events and a scheduler that coordinates between the producer and consumer. When events are pushed, the scheduler wakes any parked continuation. The consumer polls the subscription, and when events are available, they’re delivered in batches for efficiency. The entire hot path is lock-free, using atomic operations for state management.