Streaming

Streams provide continuous data flow from Rust to target languages. Unlike async functions that return a single value, streams deliver multiple values over time. This enables real-time updates, sensor data, event buses, and live feeds. BoltFFI generates native stream types that integrate with each platform’s concurrency model, and handles the underlying buffering and synchronization.

The ffi_stream Attribute

Mark a method with #[ffi_stream] to expose it as a stream. The attribute requires an item parameter specifying the type of data that flows through the stream, and accepts an optional mode parameter to control how consumers receive events.

#[ffi_stream(item = Reading)]                    // item type is Reading, async mode (default)
#[ffi_stream(item = Reading, mode = "async")]    // explicit async mode
#[ffi_stream(item = Reading, mode = "callback")] // callback mode
#[ffi_stream(item = Reading, mode = "batch")]    // batch mode

The item type must be a type that can cross the FFI boundary - primitives, records marked with #[data], or other supported types. The method must return Arc<EventSubscription<T>> where T matches the item type.

Stream Modes

BoltFFI supports three consumption modes. Each mode generates different bindings suited to different use cases.

Async Mode

The default mode generates AsyncStream in Swift and Flow in Kotlin. Use this when consumers want to process events using native async iteration. The stream integrates with structured concurrency and cancellation propagates automatically.

Async stream
RustSource
#[ffi_stream(item = Reading)]
pub fn readings(&self) -> Arc<EventSubscription<Reading>> {
  Arc::clone(&self.subscription)
}
// Generated
public func readings() -> AsyncStream<Reading>

// Usage
for await reading in sensor.readings() {
  print("Value: \(reading.value)")
}

Callback Mode

Generates a method that takes a callback and returns a cancellable handle. Use this for simple event handlers where you don’t need async iteration, or when integrating with callback-based APIs.

Callback stream
RustSource
#[ffi_stream(item = Reading, mode = "callback")]
pub fn readings(&self) -> Arc<EventSubscription<Reading>> {
  Arc::clone(&self.subscription)
}
// Generated
public func readings(
  callback: @escaping (Reading) -> Void
) -> SensorReadingsCancellable

// Usage
let handle = sensor.readings { reading in
  print("Value: \(reading.value)")
}
// Later...
handle.cancel()

Batch Mode

Generates a subscription object that lets consumers pull batches of events on their own schedule. Use this for high-frequency data where you want control over when and how many events to process, or when you need to process events on a specific thread.

Batch stream
RustSource
#[ffi_stream(item = Reading, mode = "batch")]
pub fn readings(&self) -> Arc<EventSubscription<Reading>> {
  Arc::clone(&self.subscription)
}
// Generated
public func readings() -> SensorReadingsSubscription

// Usage
let sub = sensor.readings()
while let batch = sub.popBatch(maxCount: 100) {
  for reading in batch {
      process(reading)
  }
}

Creating Streams

Streams are created using EventSubscription or StreamProducer. The choice depends on whether you need single or multiple subscribers.

EventSubscription

Use EventSubscription when each call to the stream method should create an independent subscription. Each subscriber gets its own buffer and receives all events pushed after subscribing.

EventSubscription
RustSource
use boltffi::EventSubscription;
use std::sync::Arc;

pub struct Sensor {
  subscription: Arc<EventSubscription<Reading>>,
}

#[export]
impl Sensor {
  pub fn new() -> Self {
      Sensor {
          subscription: Arc::new(EventSubscription::new(256)),
      }
  }
  
  #[ffi_stream(item = Reading)]
  pub fn readings(&self) -> Arc<EventSubscription<Reading>> {
      Arc::clone(&self.subscription)
  }
  
  pub fn emit(&self, value: f64) {
      self.subscription.push_event(Reading {
          value,
          timestamp: current_time_ms(),
      });
  }
}
// Generated
public class Sensor {
  public init()
  public func readings() -> AsyncStream<Reading>
}

// Usage
let sensor = Sensor()
for await reading in sensor.readings() {
  print(reading.value)
}

StreamProducer

Use StreamProducer when you need to broadcast events to multiple subscribers. Each subscriber gets its own buffer, and pushing an event delivers it to all active subscribers.

StreamProducer for multiple subscribers
RustSource
use boltffi::StreamProducer;

pub struct EventBus {
  producer: StreamProducer<Event>,
}

#[export]
impl EventBus {
  pub fn new() -> Self {
      EventBus {
          producer: StreamProducer::new(256),
      }
  }
  
  #[ffi_stream(item = Event)]
  pub fn events(&self) -> Arc<EventSubscription<Event>> {
      self.producer.subscribe()
  }
  
  pub fn emit(&self, event: Event) {
      self.producer.push(event);
  }
}
let bus = EventBus()

// Multiple subscribers
Task {
  for await event in bus.events() {
      print("Sub 1: \(event)")
  }
}

Task {
  for await event in bus.events() {
      print("Sub 2: \(event)")
  }
}

Buffer Capacity

Each subscription has a ring buffer that holds events until the consumer processes them. The default capacity is 256 items. For high-frequency streams, increase the capacity to avoid dropping events when the consumer falls behind.

// Default capacity (256)
EventSubscription::new(256)

// Larger buffer for high-frequency data
EventSubscription::new(4096)

// StreamProducer with custom capacity
StreamProducer::new(4096)

When the buffer is full, new events are dropped. The producer continues without blocking. If your consumer can’t keep up, either increase the buffer size or ensure the consumer processes events faster.

Stopping Streams

Streams can be stopped from either side. The producer can complete the stream, or the consumer can cancel their subscription.

Producer-side completion

Call unsubscribe() on the subscription to signal that no more events will be sent. Active consumers receive a completion signal.

impl Sensor {
    pub fn stop(&self) {
        self.subscription.unsubscribe();
    }
}

Consumer-side cancellation

In async mode, cancelling the task or breaking out of the loop cancels the subscription. In callback mode, call cancel() on the returned handle.

Consumer cancellation
RustSource
// Async mode - cancel the task
let task = Task {
  for await reading in sensor.readings() {
      if reading.value > threshold {
          break // cancels subscription
      }
  }
}
task.cancel()

// Callback mode - cancel the handle
let handle = sensor.readings { reading in
  process(reading)
}
handle.cancel()

How It Works

Streams use a continuation-based polling mechanism similar to async functions. Each subscription has a lock-free ring buffer for events and a scheduler that coordinates between the producer and consumer. When events are pushed, the scheduler wakes any parked continuation. The consumer polls the subscription, and when events are available, they’re delivered in batches for efficiency. The entire hot path is lock-free, using atomic operations for state management.