🌊 SeaStreamer is a stream processing toolkit to help you build stream processors in Rust
Async
SeaStreamer provides an async API, and it supports both 'tokio' and 'async-std'. In tandem with other async Rust libraries, you can build highly concurrent stream processors.
Generic
We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.
Testable
SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster, and extremely handy when working locally.
Micro-service Oriented
Let's build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!
A quick taste of SeaStreamer
- Consumer
- Producer
- Processor
- Running with Kafka
- Running with Redis
- Running with File
- Running with Stdio
Here is a basic stream consumer, full example:
#[tokio::main]async fn main() -> Result<()> {env_logger::init();let Args { stream } = Args::from_args();let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);options.set_auto_stream_reset(SeaStreamReset::Earliest);let consumer: SeaConsumer = streamer.create_consumer(stream.stream_keys(), options).await?;loop {let mess: SeaMessage = consumer.next().await?;println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);}}
Here is a basic stream producer, full example:
#[tokio::main]async fn main() -> Result<()> {env_logger::init();let Args { stream } = Args::from_args();let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;let producer: SeaProducer = streamer.create_producer(stream.stream_key()?, Default::default()).await?;for tick in 0..100 {let message = format!(r#""tick {tick}""#);eprintln!("{message}");producer.send(message)?;tokio::time::sleep(Duration::from_secs(1)).await;}producer.end().await?; // flushOk(())}
Here is a basic stream processor, full example:
#[tokio::main]async fn main() -> Result<()> {env_logger::init();let Args { input, output } = Args::from_args();let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;let options = SeaConsumerOptions::new(ConsumerMode::RealTime);let consumer: SeaConsumer = streamer.create_consumer(input.stream_keys(), options).await?;let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;let producer: SeaProducer = streamer.create_producer(output.stream_key()?, Default::default()).await?;loop {let message: SeaMessage = consumer.next().await?;let message = process(message).await?;eprintln!("{message}");producer.send(message)?; // send is non-blocking}}
# Produce some inputcargo run --bin producer -- --stream kafka://localhost:9092/hello1 &# Start the processor, producing some outputcargo run --bin processor -- --input kafka://localhost:9092/hello1 --output kafka://localhost:9092/hello2 &# Replay the outputcargo run --bin consumer -- --stream kafka://localhost:9092/hello2# Remember to stop the processeskill %1 %2
# Produce some inputcargo run --bin producer -- --stream redis://localhost:6379/hello1 &# Start the processor, producing some outputcargo run --bin processor -- --input redis://localhost:6379/hello1 --output redis://localhost:6379/hello2 &# Replay the outputcargo run --bin consumer -- --stream redis://localhost:6379/hello2# Remember to stop the processeskill %1 %2
# Create the filefile=/tmp/sea-streamer-$(date +%s)touch $file && echo "File created at $file"# Produce some inputcargo run --bin producer -- --stream file://$file/hello &# Replay the inputcargo run --bin consumer -- --stream file://$file/hello# Start the processor, producing some outputcargo run --bin processor -- --input file://$file/hello --output stdio:///hello
# Pipe the producer to the processorcargo run --bin producer -- --stream stdio:///hello1 | cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
Meet Terres, our official mascot
A friend of Ferris, Terres the hermit crab is a member of the Rustacean family.