This tutorial shows you how to use Rust to build a system that:
- Subscribe to a real-time websocket data feed
- Stream the data to Kafka / Redis
- Save the data into a SQL database
Here, we'll employ a micro-services architecture, and split the functionality into two apps:
โโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
โ Websocket Data Feed โ ---> Redis / Kafka ---> โ SQL Data Sink โ
โโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
In stream processing, we often use the terms "source" / "sink", but a data sink is simply a stream consumer that persists the data into a store.
On the source side, we'd use SeaStreamer. On the sink side, we'd be using SeaORM. Below are the supported technologies; for the rest of this article, we'll be using Redis
and SQLite
because they're easy to setup.
SeaStreamer | SeaORM |
---|---|
Kafka, Redis | MySQL, Postgres, SQLite, SQL Server1 |
To get started, you can quickly start a Redis instance via Docker:
docker run -d --rm --name redis -p 6379:6379 redis
1. Websocket subscriptionโ
Let's write a websocket subscriber in Rust. Here we'd use the awesome async-tungstenite library.
We'd subscribe to the GBP/USD
price feed from Kraken, API documentation can be found here. NB: they're not real FX data, but should be good enough for demo.
Step 1, create a websocket connection:
let (mut ws, _) = async_tungstenite::tokio::connect_async("wss://ws.kraken.com/").await?;
Step 2, send a subscription request:
ws.send(Message::Text(
r#"{ "event": "subscribe", "pair": ["GBP/USD"], "subscription": { "name": "spread" } }"#.to_owned(),
)).await?;
Step 3, stream the messages:
loop {
match ws.next().await {
Some(Ok(Message::Text(data))) => {
if data == r#"{"event":"heartbeat"}"# {
continue;
}
println!("{data}");
}
Some(Err(e)) => bail!("Socket error: {e}"),
None => bail!("Stream ended"),
e => bail!("Unexpected message {e:?}"),
}
}
2. Redis / Kafka Stream Producerโ
Step 1, create a SeaStreamer
instance connecting to Redis / Kafka:
let streamer = SeaStreamer::connect(
"redis://localhost", SeaConnectOptions::default()
).await?;
There are a bunch of different options for Redis & Kafka respectively, you can refer to SeaStreamer's documentation.
Step 2, create a producer:
let producer: SeaProducer = streamer
.create_producer(
"GBP_USD".parse()?, // Stream Key
Default::default(), // Producer Options
)
.await?;
There aren't any specific options for Producer.
Step 3, decode the messages:
let spread: SpreadMessage = serde_json::from_str(&data)?;
let message = serde_json::to_string(&spread)?;
Here, we use the awesome serde
library to perform message parsing and conversion:
// The raw message looks like: [80478222,["1.25475","1.25489","1714946803.030088","949.74917071","223.36195920"],"spread","GBP/USD"]
#[derive(Debug, Serialize, Deserialize)]
struct SpreadMessage {
#[allow(dead_code)]
#[serde(skip_serializing)]
channel_id: u32, // placeholder; not needed
spread: Spread, // nested object
channel_name: String,
pair: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct Spread {
bid: Decimal,
ask: Decimal,
#[serde(with = "timestamp_serde")] // custom serde
timestamp: Timestamp,
bid_vol: Decimal,
ask_vol: Decimal,
}
Step 4, send the messages:
loop {
match ws.next().await {
Some(Ok(Message::Text(data))) => {
let spread: SpreadMessage = serde_json::from_str(&data)?;
let message = serde_json::to_string(&spread)?;
producer.send(message)?; // <--
}
}
}
Note that the producer.send
call is not async/await
, and this is a crucial detail! This removes the stream processing bottleneck. Behind the scene, messages will be buffered and handled on a different thread, so that your input stream can run as close to real-time as possible.
Here is the complete price-feed
app which you can checkout from the SeaStreamer repository:
$ cd examples/price-feed
$ cargo run
Connecting ..
Connected.
Subscribed.
{"spread":{"bid":"1.25495","ask":"1.25513","timestamp":"2024-05-05T16:31:00.961214","bid_vol":"61.50588918","ask_vol":"787.90883861"},"channel_name":"spread","pair":"GBP/USD"}
..
3. SQL Data Sinkโ
Step 1, create a stream consumer:
let streamer = SeaStreamer::connect(streamer_uri, Default::default()).await?;
let consumer = streamer
.create_consumer(&[stream_key], SeaConsumerOptions::default())
.await?;
There are a bunch of different options for Redis & Kafka respectively, you can refer to SeaStreamer's examples. Here we use the default, which is a real-time state-less stream consumer.
Step 2, create a database:
let mut opt = ConnectOptions::new("sqlite://my_db.sqlite?mode=rwc"));
opt.max_connections(1).sqlx_logging(false);
let db = Database::connect(opt).await?;
We set max_connections
to 1
, because our data sink will not do concurrent inserts anyway.
Here is the Entity
:
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel, Deserialize)]
#[sea_orm(table_name = "event")]
pub struct Model {
#[sea_orm(primary_key)]
#[serde(default)]
pub id: i32,
pub timestamp: String,
pub bid: String,
pub ask: String,
pub bid_vol: String,
pub ask_vol: String,
}
The table shall be named event
and we derive Deserialize
on the Model.
We will use the following helper method to create the database table, where the schema is derived from the Entity:
async fn create_tables(db: &DbConn) -> Result<(), DbErr> {
let builder = db.get_database_backend();
let schema = Schema::new(builder);
let stmt = builder.build(
schema.create_table_from_entity(Entity).if_not_exists(),
);
log::info!("{stmt}");
db.execute(stmt).await?;
Ok(())
}
This is especially handy for SQLite, where the app owns the database schema. For other databases, you'd probably use the SeaORM migration system.
Step 3, insert the data into database:
loop {
let message = consumer.next().await?;
let payload = message.message();
let json = payload.as_str()?;
let item: Item = serde_json::from_str(json)?;
let mut spread = item.spread.into_active_model();
spread.id = NotSet; // let the db assign primary key
spread.save(&db).await?;
}
In a few lines of code, we:
- receive the message from Redis
- decode the message as JSON
- convert the message into a SeaORM Model
- insert the Model into database
Run the sea-orm-sink
app in another terminal:
$ cd examples/sea-orm-sink
$ RUST_LOG=info cargo run
[INFO sea_streamer_sea_orm_sink] CREATE TABLE IF NOT EXISTS "event" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "timestamp" varchar NOT NULL, "bid" varchar NOT NULL, "ask" varchar NOT NULL, "bid_vol" varchar NOT NULL, "ask_vol" varchar NOT NULL )
[INFO sea_streamer_sea_orm_sink] {"spread":{"bid":"1.25495","ask":"1.25513","timestamp":"2024-05-05T16:31:00.961214","bid_vol":"61.50588918","ask_vol":"787.90883861"},"channel_name":"spread","pair":"GBP/USD"}
That's it! Now you can inspect the data with your favourite database GUI and write some SQL queries:
Conclusionโ
In this article, we covered:
- Micro-services architecture in stream processing
- Async real-time programming in Rust
- The awesomeness of the SeaQL and Rust ecosystem2
Here are a few suggestions how you can take it from here:
- Stream the data to a "big database" like MySQL or Postgres
- Subscribe to more streams and sink to more tables
- Buffer the events and insert the data in batches to achieve higher throughput, further reads:
Rustacean Sticker Pack ๐ฆโ
The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!
Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.
Sticker Pack Contents:
- Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
- Mascot of SeaQL: Terres the Hermit Crab
- Mascot of Rust: Ferris the Crab
- The Rustacean word
Support SeaQL and get a Sticker Pack!