Skip to main content

What's new in SeaStreamer 0.5

Β· 4 min read
Chris Tsang

πŸŽ‰ We are pleased to release SeaStreamer 0.5.x!

Here is the summary of new features and enhancements:

sea-streamer-types​

  • Added From<Url> and FromIterator<Url> for StreamerUri #28
  • Impl Default for Payload
  • Impl serde Serialize & Deserialize for StreamKey (enabled by the feature flag serde), so this is now possible:
#[derive(Serialize, Deserialize)]
struct MyStruct {
stream_key: StreamKey,
}

sea-streamer-socket​

  • The Socket library can now be compiled without the stdio backend #35

sea-streamer-redis​

  • Support nanosecond timestamp in Redis (under feature flag nanosecond-timestamp). Redis's default Stream ID resolution is millisecond, and it can be changed to nanosecond with RedisConnectOptions::set_timestamp_format:
    let mut options = RedisConnectOptions::default();
    options.set_timestamp_format(TimestampFormat::UnixTimestampNanos);
  • Added RedisConnectOptions::set_message_field to set custom message field (the default used to be msg):
    let mut options = RedisConnectOptions::default();
    options.set_message_field("event");
  • Added RedisProducer::send_with_ts to specify custom timestamp:
    producer.send_with_ts(&stream_key, timestamp, message)?;
  • Added RedisProducer::flush_immut. This method is same as RedisProducer::flush but without &mut self
  • Added RedisProducer::trim to perform XTRIM MAXLEN:
    producer.trim(&stream_key, maxlen).await?;
  • Fixed capacity overflow error in some cases

sea-streamer-file​

  • Added a special SEA_STREAMER_WILDCARD stream key to subscribe to all streams in a file:
    let consumer: SeaConsumer = streamer
    .create_consumer(&[StreamKey::new(SEA_STREAMER_WILDCARD)?], options)
    .await?;

sea-streamer-fuse​

We've shipped the first component library for stream processing! It currently only has one class, StreamJoin.

It is designed to be used in stream replay. In live streaming, if you have multiple streams from different sources and you want to multiplex them together, you can use the awesome futures_concurrency crate's Merge, and it just works!

use futures_concurrency::{stream::Merge, vec::Merge as Merged};

let consumers: Vec<SeaConsumer> = vec![stream_a, stream_b];
let streams: Vec<SeaMessageStream<'a>> = consumers.iter_mut().map(|ss| ss.stream()).collect();
let merged: Merged<SeaMessageStream<'a>> = streams.merge();

stream_a and stream_b can be heterogeneous, meaning they can be Kafka, Redis or even File.

How about in replay? In replay, different streams can flow at different pace, and thus if we try to naively merge them, the messages would come out-of-order.

To solve this problem, you can use StreamJoin::muxed:

type LiveStream<'a> = Merged<SeaMessageStream<'a>>;
let joined: StreamJoin<LiveStream<'a>, SeaMessage<'a>, StreamErr<BackendErr>> = StreamJoin::muxed(merged);

StreamJoin::align must be called manually to specify which streams should be aligned. Otherwise, messages will be out of order until the first message of each key arrives. Imagine a severely delayed stream sending its first message one day later; it would invalidate everything that came before it. However, the issue lies with the delayed stream itself, not the others.

In the example below, messages from the fast stream will be buffered, until a message from the slow stream arrives.

fast | (1) (2) (3) (4) (5)
slow | (2) (6)

Messages 1, 2 from fast will be buffered, until 2 from the slow stream arrives. Likewise, messages 3, 4, 5 will be buffered until 6 arrives.

The StreamJoin component is generic, and can actually be used outside of SeaStreamer, the only requirement is that the thing we want to align implements sea_streamer::Message:

impl Message for MyMessage {
fn stream_key(&self) -> StreamKey { /* implement this */ }

fn timestamp(&self) -> Timestamp { /* implement this */ }

fn shard_id(&self) -> ShardId { /* doesn't matter */ }

fn sequence(&self) -> SeqNo { /* doesn't matter */ }

fn message(&self) -> Payload { /* doesn't matter */ }
}

Anecdote​

Over the past year, we've been using SeaStreamer heavily in production and it served us well!

SeaStreamer File is really handy, because it supports live streaming and also duals as an archive, in which it can be rotated and uploaded to the data lake every day. It has replaced our use of Redis in some same-host mpmc streaming scenario.

Redis Streams is also super nice (fast and reliable) and especially easy with SeaStreamer. IMO it's been underrated, it became our default choice for cross-host streaming.

By the way, SeaStreamer File is used as the tracing file format in FireDBG.

Community​

SeaQL.org is an independent open-source organization run by passionate ️developers. If you like our projects, please star ⭐ and share our repositories. If you feel generous, a small donation via GitHub Sponsor will be greatly appreciated, and goes a long way towards sustaining the organization 🚒.

SeaStreamer is a community driven project. We welcome you to participate, contribute and together build for Rust's future πŸ¦€.