We are pleased to introduce SeaStreamer to the Rust community today. SeaStreamer is a stream processing toolkit to help you build stream processors in Rust.
At SeaQL we want to make Rust the best programming platform for data engineering. Where SeaORM is the essential tool for working with SQL databases, SeaStreamer aims to be your essential toolkit for working with streams.
Currently SeaStreamer provides integration with Kafka and Redis.
Let's have a quick tour of SeaStreamer.
High level async APIβ
- High level async API that supports both
async-std
andtokio
- Mutex-free implementation1: concurrency achieved by message passing
- A comprehensive type system that guides/restricts you with the API
Below is a basic Kafka consumer:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let stream: StreamUrl = "kafka://streamer.sea-ql.org:9092/my_stream".parse()?;
let streamer = KafkaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = KafkaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_offset_reset(AutoOffsetReset::Earliest);
let consumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;
loop {
let mess = consumer.next().await?;
println!("{}", mess.message().as_str()?);
}
}
Consumer::stream()
returns an object that implements the Stream
trait, which allows you to do neat things:
let items = consumer
.stream()
.take(num)
.map(process_message)
.collect::<Vec<_>>()
.await
Trait-based abstract interfaceβ
All SeaStreamer backends implement a common abstract interface, offering you a familiar API. Below is a basic Redis consumer, which is nearly the same as the previous example:
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let stream: StreamUrl = "redis://localhost:6379/my_stream".parse()?;
let streamer = RedisStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = RedisConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(AutoStreamReset::Earliest);
let consumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;
loop {
let mess = consumer.next().await?;
println!("{}", mess.message().as_str()?);
}
}
Redis Streams Supportβ
SeaStreamer Redis provides a Kafka-like stream semantics:
- Non-group streaming with AutoStreamReset option
- Consumer-group-based streaming with auto-ack and/or auto-commit
- Load balancing among consumers with automatic failover
- Seek/rewind to point in time
You don't have to call XADD
, XREAD
, XACK
, etc... anymore!
Enum-based generic interfaceβ
The trait-based API requires you to designate the concrete Streamer
type for monomorphization, otherwise the code cannot compile.
Akin to how SeaORM implements runtime-polymorphism, SeaStreamer provides a enum-based generic streamer, in which the backend is selected on runtime.
Here is an illustration (full example):
// sea-streamer-socket
pub struct SeaConsumer {
backend: SeaConsumerBackend,
}
enum SeaConsumerBackend {
#[cfg(feature = "backend-kafka")]
Kafka(KafkaConsumer),
#[cfg(feature = "backend-redis")]
Redis(RedisConsumer),
#[cfg(feature = "backend-stdio")]
Stdio(StdioConsumer),
}
// Your code
let uri: StreamerUri = "kafka://localhost:9092".parse()?; // or
let uri: StreamerUri = "redis://localhost:6379".parse()?; // or
let uri: StreamerUri = "stdio://".parse()?;
// SeaStreamer will be backed by Kafka, Redis or Stdio depending on the URI
let streamer = SeaStreamer::connect(uri, Default::default()).await?;
// Set backend-specific options
let mut options = SeaConsumerOptions::new(ConsumerMode::Resumable);
options.set_kafka_consumer_options(|options: &mut KafkaConsumerOptions| { .. });
options.set_redis_consumer_options(|options: &mut RedisConsumerOptions| { .. });
let mut consumer: SeaConsumer = streamer.create_consumer(stream_keys, options).await?;
// You can still retrieve the concrete type
let kafka: Option<&mut KafkaConsumer> = consumer.get_kafka();
let redis: Option<&mut RedisConsumer> = consumer.get_redis();
So you can "write once, stream anywhere"!
Good old unix pipeβ
In SeaStreamer, stdin
& stdout
can be used as stream source and sink.
Say you are developing some processors to transform a stream in several stages:
./processor_1 --input kafka://localhost:9092/input --output kafka://localhost:9092/stage_1 &
./processor_2 --input kafka://localhost:9092/stage_1 --output kafka://localhost:9092/stage_2 &
./processor_3 --input kafka://localhost:9092/stage_2 --output kafka://localhost:9092/output &
It would be great if we can simply pipe the processors together right?
With SeaStreamer, you can do the following:
./processor_1 --input kafka://localhost:9092/input --output stdio:///stream |
./processor_2 --input stdio:///stream --output stdio:///stream |
./processor_3 --input stdio:///stream --output kafka://localhost:9092/output
All without recompiling the stream processors! Now, you can develop locally with the comfort of using |
, >
, <
and your favourite unix program in the shell.
Testableβ
SeaStreamer encourages you to write tests at all levels:
- You can execute tests involving several stream processors in the same OS process
- You can execute tests involving several OS processes by connecting them with pipes
- You can execute tests involving several stream processors with Redis / Kafka
All against the same piece of code! Let SeaStreamer take away the boilerplate and mocking facility from your codebase.
Below is an example of intra-process testing, which can be run with cargo test
without any dependency or side-effects:
let stream = StreamKey::new("test")?;
let mut options = StdioConnectOptions::default();
options.set_loopback(true); // messages produced will be feed back to consumers
let streamer = StdioStreamer::connect(StreamerUri::zero(), options).await?;
let producer = streamer.create_producer(stream.clone(), Default::default()).await?;
let mut consumer = streamer.create_consumer(&[stream.clone()], Default::default()).await?;
for i in 0..5 {
let mess = format!("{}", i);
producer.send(mess)?;
}
let seq = collect(&mut consumer, 5).await;
assert_eq!(seq, [0, 1, 2, 3, 4]);
Getting startedβ
If you are eager to get started with SeaStreamer, you can checkout our set of examples:
consumer
: A basic consumerproducer
: A basic producerprocessor
: A basic stream processorresumable
: A resumable stream processor that continues from where it left offbuffered
: An advanced stream processor with internal buffering and batch processingblocking
: An advanced stream processor for handling blocking / CPU-bound tasks
Read the official documentation to learn more.
Roadmapβ
A few major components we plan to develop:
- File Backend
- Redis Cluster
We welcome you to join our Discussions if you have thoughts or ideas!
Peopleβ
SeaStreamer is designed and developed by the same mind who brought you SeaORM:
Communityβ
SeaQL.org is an independent open-source organization run by passionate οΈdevelopers. If you like our projects, please star β and share our repositories. If you feel generous, a small donation via GitHub Sponsor will be greatly appreciated, and goes a long way towards sustaining the organization π’.
SeaStreamer is a community driven project. We welcome you to participate, contribute and together build for Rust's future π¦.
- except
sea-streamer-stdio
, but only contends on consumer add/dropβ©