π We are pleased to release SeaStreamer 0.5.x
!
Here is the summary of new features and enhancements:
sea-streamer-types
β
- Added
From<Url>
andFromIterator<Url>
forStreamerUri
#28 - Impl
Default
forPayload
- Impl serde
Serialize
&Deserialize
forStreamKey
(enabled by the feature flagserde
), so this is now possible:
#[derive(Serialize, Deserialize)]
struct MyStruct {
stream_key: StreamKey,
}
sea-streamer-socket
β
- The Socket library can now be compiled without the
stdio
backend #35
sea-streamer-redis
β
- Support nanosecond timestamp in Redis (under feature flag
nanosecond-timestamp
). Redis's default Stream ID resolution is millisecond, and it can be changed to nanosecond withRedisConnectOptions::set_timestamp_format
:let mut options = RedisConnectOptions::default();
options.set_timestamp_format(TimestampFormat::UnixTimestampNanos); - Added
RedisConnectOptions::set_message_field
to set custom message field (the default used to bemsg
):let mut options = RedisConnectOptions::default();
options.set_message_field("event"); - Added
RedisProducer::send_with_ts
to specify custom timestamp:producer.send_with_ts(&stream_key, timestamp, message)?;
- Added
RedisProducer::flush_immut
. This method is same asRedisProducer::flush
but without&mut self
- Added
RedisProducer::trim
to performXTRIM MAXLEN
:producer.trim(&stream_key, maxlen).await?;
- Fixed
capacity overflow
error in some cases
sea-streamer-file
β
- Added a special
SEA_STREAMER_WILDCARD
stream key to subscribe to all streams in a file:let consumer: SeaConsumer = streamer
.create_consumer(&[StreamKey::new(SEA_STREAMER_WILDCARD)?], options)
.await?;
sea-streamer-fuse
β
We've shipped the first component library for stream processing! It currently only has one class, StreamJoin
.
It is designed to be used in stream replay. In live streaming, if you have multiple streams from different sources and you want to multiplex them together, you can use the awesome futures_concurrency
crate's Merge
, and it just works!
use futures_concurrency::{stream::Merge, vec::Merge as Merged};
let consumers: Vec<SeaConsumer> = vec![stream_a, stream_b];
let streams: Vec<SeaMessageStream<'a>> = consumers.iter_mut().map(|ss| ss.stream()).collect();
let merged: Merged<SeaMessageStream<'a>> = streams.merge();
stream_a
and stream_b
can be heterogeneous, meaning they can be Kafka, Redis or even File.
How about in replay? In replay, different streams can flow at different pace, and thus if we try to naively merge them, the messages would come out-of-order.
To solve this problem, you can use StreamJoin::muxed
:
type LiveStream<'a> = Merged<SeaMessageStream<'a>>;
let joined: StreamJoin<LiveStream<'a>, SeaMessage<'a>, StreamErr<BackendErr>> = StreamJoin::muxed(merged);
StreamJoin::align
must be called manually to specify which streams should be aligned. Otherwise, messages will be out of order until the first message of each key arrives. Imagine a severely delayed stream sending its first message one day later; it would invalidate everything that came before it. However, the issue lies with the delayed stream itself, not the others.
In the example below, messages from the fast stream will be buffered, until a message from the slow stream arrives.
fast | (1) (2) (3) (4) (5)
slow | (2) (6)
Messages 1
, 2
from fast will be buffered, until 2
from the slow stream arrives. Likewise, messages 3
, 4
, 5
will be buffered until 6 arrives.
The StreamJoin
component is generic, and can actually be used outside of SeaStreamer, the only requirement is that the thing we want to align implements sea_streamer::Message
:
impl Message for MyMessage {
fn stream_key(&self) -> StreamKey { /* implement this */ }
fn timestamp(&self) -> Timestamp { /* implement this */ }
fn shard_id(&self) -> ShardId { /* doesn't matter */ }
fn sequence(&self) -> SeqNo { /* doesn't matter */ }
fn message(&self) -> Payload { /* doesn't matter */ }
}
Anecdoteβ
Over the past year, we've been using SeaStreamer heavily in production and it served us well!
SeaStreamer File is really handy, because it supports live streaming and also duals as an archive, in which it can be rotated and uploaded to the data lake every day. It has replaced our use of Redis in some same-host mpmc streaming scenario.
Redis Streams is also super nice (fast and reliable) and especially easy with SeaStreamer. IMO it's been underrated, it became our default choice for cross-host streaming.
By the way, SeaStreamer File is used as the tracing file format in FireDBG.
Communityβ
SeaQL.org is an independent open-source organization run by passionate οΈdevelopers. If you like our projects, please star β and share our repositories. If you feel generous, a small donation via GitHub Sponsor will be greatly appreciated, and goes a long way towards sustaining the organization π’.
SeaStreamer is a community driven project. We welcome you to participate, contribute and together build for Rust's future π¦.