Skip to main content

Introducing SeaStreamer 🌊

Β· 5 min read
Chris Tsang

We are pleased to introduce SeaStreamer to the Rust community today. SeaStreamer is a stream processing toolkit to help you build stream processors in Rust.

At SeaQL we want to make Rust the best programming platform for data engineering. Where SeaORM is the essential tool for working with SQL databases, SeaStreamer aims to be your essential toolkit for working with streams.

Currently SeaStreamer provides integration with Kafka and Redis.

Let's have a quick tour of SeaStreamer.

High level async API​

  • High level async API that supports both async-std and tokio
  • Mutex-free implementation1: concurrency achieved by message passing
  • A comprehensive type system that guides/restricts you with the API

Below is a basic Kafka consumer:

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let stream: StreamUrl = "kafka://streamer.sea-ql.org:9092/my_stream".parse()?;
let streamer = KafkaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = KafkaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_offset_reset(AutoOffsetReset::Earliest);
let consumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;

loop {
let mess = consumer.next().await?;
println!("{}", mess.message().as_str()?);
}
}

Consumer::stream() returns an object that implements the Stream trait, which allows you to do neat things:

let items = consumer
.stream()
.take(num)
.map(process_message)
.collect::<Vec<_>>()
.await

Trait-based abstract interface​

All SeaStreamer backends implement a common abstract interface, offering you a familiar API. Below is a basic Redis consumer, which is nearly the same as the previous example:

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let stream: StreamUrl = "redis://localhost:6379/my_stream".parse()?;
let streamer = RedisStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = RedisConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(AutoStreamReset::Earliest);
let consumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;

loop {
let mess = consumer.next().await?;
println!("{}", mess.message().as_str()?);
}
}

Redis Streams Support​

SeaStreamer Redis provides a Kafka-like stream semantics:

  • Non-group streaming with AutoStreamReset option
  • Consumer-group-based streaming with auto-ack and/or auto-commit
  • Load balancing among consumers with automatic failover
  • Seek/rewind to point in time

You don't have to call XADD, XREAD, XACK, etc... anymore!

Enum-based generic interface​

The trait-based API requires you to designate the concrete Streamer type for monomorphization, otherwise the code cannot compile.

Akin to how SeaORM implements runtime-polymorphism, SeaStreamer provides a enum-based generic streamer, in which the backend is selected on runtime.

Here is an illustration (full example):

// sea-streamer-socket
pub struct SeaConsumer {
backend: SeaConsumerBackend,
}

enum SeaConsumerBackend {
#[cfg(feature = "backend-kafka")]
Kafka(KafkaConsumer),
#[cfg(feature = "backend-redis")]
Redis(RedisConsumer),
#[cfg(feature = "backend-stdio")]
Stdio(StdioConsumer),
}

// Your code
let uri: StreamerUri = "kafka://localhost:9092".parse()?; // or
let uri: StreamerUri = "redis://localhost:6379".parse()?; // or
let uri: StreamerUri = "stdio://".parse()?;

// SeaStreamer will be backed by Kafka, Redis or Stdio depending on the URI
let streamer = SeaStreamer::connect(uri, Default::default()).await?;

// Set backend-specific options
let mut options = SeaConsumerOptions::new(ConsumerMode::Resumable);
options.set_kafka_consumer_options(|options: &mut KafkaConsumerOptions| { .. });
options.set_redis_consumer_options(|options: &mut RedisConsumerOptions| { .. });
let mut consumer: SeaConsumer = streamer.create_consumer(stream_keys, options).await?;

// You can still retrieve the concrete type
let kafka: Option<&mut KafkaConsumer> = consumer.get_kafka();
let redis: Option<&mut RedisConsumer> = consumer.get_redis();

So you can "write once, stream anywhere"!

Good old unix pipe​

In SeaStreamer, stdin & stdout can be used as stream source and sink.

Say you are developing some processors to transform a stream in several stages:

./processor_1 --input kafka://localhost:9092/input --output kafka://localhost:9092/stage_1 &
./processor_2 --input kafka://localhost:9092/stage_1 --output kafka://localhost:9092/stage_2 &
./processor_3 --input kafka://localhost:9092/stage_2 --output kafka://localhost:9092/output &

It would be great if we can simply pipe the processors together right?

With SeaStreamer, you can do the following:

./processor_1 --input kafka://localhost:9092/input --output stdio:///stream |
./processor_2 --input stdio:///stream --output stdio:///stream |
./processor_3 --input stdio:///stream --output kafka://localhost:9092/output

All without recompiling the stream processors! Now, you can develop locally with the comfort of using |, >, < and your favourite unix program in the shell.

Testable​

SeaStreamer encourages you to write tests at all levels:

  • You can execute tests involving several stream processors in the same OS process
  • You can execute tests involving several OS processes by connecting them with pipes
  • You can execute tests involving several stream processors with Redis / Kafka

All against the same piece of code! Let SeaStreamer take away the boilerplate and mocking facility from your codebase.

Below is an example of intra-process testing, which can be run with cargo test without any dependency or side-effects:

let stream = StreamKey::new("test")?;
let mut options = StdioConnectOptions::default();
options.set_loopback(true); // messages produced will be feed back to consumers
let streamer = StdioStreamer::connect(StreamerUri::zero(), options).await?;
let producer = streamer.create_producer(stream.clone(), Default::default()).await?;
let mut consumer = streamer.create_consumer(&[stream.clone()], Default::default()).await?;

for i in 0..5 {
let mess = format!("{}", i);
producer.send(mess)?;
}

let seq = collect(&mut consumer, 5).await;
assert_eq!(seq, [0, 1, 2, 3, 4]);

Getting started​

If you are eager to get started with SeaStreamer, you can checkout our set of examples:

  • consumer: A basic consumer
  • producer: A basic producer
  • processor: A basic stream processor
  • resumable: A resumable stream processor that continues from where it left off
  • buffered: An advanced stream processor with internal buffering and batch processing
  • blocking: An advanced stream processor for handling blocking / CPU-bound tasks

Read the official documentation to learn more.

Roadmap​

A few major components we plan to develop:

  • File Backend
  • Redis Cluster

We welcome you to join our Discussions if you have thoughts or ideas!

People​

SeaStreamer is designed and developed by the same mind who brought you SeaORM:

Chris Tsang

Community​

SeaQL.org is an independent open-source organization run by passionate ️developers. If you like our projects, please star ⭐ and share our repositories. If you feel generous, a small donation via GitHub Sponsor will be greatly appreciated, and goes a long way towards sustaining the organization 🚒.

SeaStreamer is a community driven project. We welcome you to participate, contribute and together build for Rust's future πŸ¦€.


  1. except sea-streamer-stdio, but only contends on consumer add/drop↩