This story stems from the saying "What Color is Your Function?" as a criticism to the async implementation of common programming languages. Well, Rust also falls into the category of "colored functions". So in this blog post, let's see how we can design systems to effectively combine sync and async code.
Rainbow bridge is a reference to the bridge in Thor that teleports you between different realms - a perfect analogy!
Backgroundโ
Sync code can be blocking IO, or expensive computation. Async code is usually network IO where you'd wait for results.
In both cases, we want to maximize concurrency, such that the program can make full use of the CPU instead of sitting there idle. A common approach is message passing, where we package tasks and send them to different workers for execution.
Sync -> Syncโ
Let's start with the classic example, pure sync code. There exists std::sync::mpsc in the standard library, so let's take a look.
use std::sync::mpsc::channel;
// create an unbounded channel
let (sender, receiver) = channel();
// never blocks
sender.send("Hello".to_string()).unwrap();
let handle = std::thread::spawn(move|| {
// wait until there is a message
let message = receiver.recv().unwrap();
println!("{message}");
});
handle.join().unwrap();
println!("Bye");
Prints (Playground):
Hello
Bye
Now, we'll make a more elaborate example: a program that spawns a number of worker threads to perform some 'expensive' computation. The main thread would dispatch the tasks to those threads and in turn collect the results via another channel.
โโโโโโโโโโโโโโโ tasks โโโโโโโโโโโโโโโโโโโ result
โ โโโโโโโโโโโโโโโก worker thread 1 โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ main thread โ โโโโโโโโโโโโโโโโโโโค โ โโโโโก main thread โ
โ โโโโโโโโโโโโโโโก worker thread 2 โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
First, setup the channels.
let (result, collector) = channel(); // result
let mut senders = Vec::new();
for _ in 0..THREADS {
let (sender, receiver) = channel(); // tasks
senders.push(sender);
let result = result.clone();
std::thread::spawn(move || worker(receiver, result));
}
The worker thread looks like:
fn worker(receiver: Receiver<Task>, sender: Sender<Done>) {
while let Ok(task) = receiver.recv() {
let result = process(task);
sender.send(result).unwrap();
}
}
Then, dispatch tasks.
for c in 0..TASKS {
let task = some_random_task();
senders[c % THREADS].send(task).unwrap();
}
Finally, we can collect results.
for _ in 0..TASKS {
let result = collector.recv().unwrap();
println!("{result:?}");
}
Full source code can be found here.
Async -> Asyncโ
Next, we'll migrate to async land. Using tokio::sync::mpsc, it's very similar to the above example, except every operation is async
and thus imposes additional restrictions to lifetimes. (The trick is, just move / clone. Don't borrow)
tokio
's unbounded_channel
is the equivalent to std
's channel
. Otherwise it's very similar. The spawn
method takes in a Future
; since the worker needs to take in the channels, we construct an async closure with async move {}
.
std | tokio |
---|---|
(unbounded) channel | unbounded_channel |
sync_channel | (bounded) channel |
let (result, mut collector) = unbounded_channel();
let mut senders = Vec::new();
for _ in 0..WORKERS {
let (sender, mut receiver) = unbounded_channel();
senders.push(sender);
let result = result.clone();
tokio::task::spawn(async move {
while let Some(task) = receiver.recv().await {
result.send(process(task).await).unwrap();
}
});
}
std::mem::drop(result); // <-- ?
Why do we need to drop the result
sender? This is one of the foot gun: tokio
would swallow panics originated within the task, and so if that happened, the program would never exit. By dropping the last copy of result
in scope, the channel would automatically close after all tasks exit, which in turn would triggle up to our collector
.
The rest is almost the same.
for (i, task) in tasks.iter().enumerate() {
senders[i % WORKERS].send(task.clone()).unwrap();
}
std::mem::drop(senders);
for _ in 0..tasks.len() {
let result = collector.recv().await.unwrap();
println!("{result:?}");
}
Full source code can be found here.
Flume mpmcโ
mpmc - multi producer, multi consumerโ
The previous examples have a flaw: we have to spawn multiple mpsc
channels to send tasks, which is:
- clumsy. we need to keep a list of
senders
- not the most efficient. is round-robin the best way of distributing tasks? some of the workers may remain idle
Here is the ideal setup:
tasks โโโโโโโโโโโโโโโโโโโ result
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโก worker thread 1 โโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ main thread โโโโโฃ โโโโโโโโโโโโโโโโโโโค โ โโโโก main thread โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโก worker thread 2 โโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโ
Let's rewrite our example using Flume. But first, know the mapping between tokio
and flume
:
Tokio | Flume |
---|---|
unbounded_channel | unbounded (channel) |
(bounded) channel | bounded (channel) |
send | send |
recv | recv_async |
In tokio
, the method is exclusive: async fn recv(&mut self)
; in flume
, the method is fn recv_async(&self) -> RecvFut
. The type signature already told you the distinction between mpsc
vs mpmc
! It is wrong to use the blocking recv
method in async context in flume
, but sadly the compiler would not warn you about it.
The channel setup is now slightly simpler:
let (sender, receiver) = unbounded(); // task
let (result, collector) = unbounded(); // result
for _ in 0..WORKERS {
let receiver = receiver.clone();
let result = result.clone();
tokio::task::spawn(async move {
while let Ok(task) = receiver.recv_async().await {
result.send(process(task).await).unwrap();
}
});
}
We no longer have to dispatch tasks ourselves. All workers share the same task queue, and thus workers would fetch the next task as soon as the previous one is finished - effectively load balance among themselves!
for task in &tasks {
sender.send(task.clone()).unwrap();
}
for _ in 0..tasks.len() {
let result = collector.recv_async().await.unwrap();
println!("{result:?}");
}
Full source code can be found here.
Sync -> Asyncโ
In the final example, let's consider a program that is mostly sync, but has a few async operations that we want to handle in a background thread.
In the example below, our blocking operation is 'reading from stdin' from the main thread. And we send those lines to an async thread to handle.
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ
โ main thread โโโโโโโโโโโโโก async thread โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ
It follows the usual 3 steps:
- create a flume channel
- pass the receiver end to a worker thread
- send tasks over the channel
fn main() -> Result<()> {
let (sender, receiver) = unbounded(); // flume channel
std::thread::spawn(move || {
// this runtime is single-threaded
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
rt.block_on(handler(receiver))
});
loop {
let mut line = String::new();
// this blocks the current thread until there is a new line
match std::io::stdin().read_line(&mut line) {
Ok(0) => break, // this means stdin is closed
Ok(_) => (),
Err(e) => panic!("{e:?}"),
}
sender.send(line)?;
}
Ok(())
}
This is the handler:
async fn handler(receiver: Receiver<String>) -> Result<()> {
while let Ok(line) = receiver.recv_async().await {
process(line).await?;
}
Ok(())
}
It doesn't look much different from the async -> async
example, the only difference is one side is sync! Full source code can be found here.
Graceful shutdownโ
The above code has a problem: we never know whether a line has been processed. If the program has an exit mechanism from handling sigint
, there is a possibility of exiting before all the lines has been processed.
Let's see how we can shutdown properly.
let handle = std::thread::spawn(..);
// running is an AtomicBool
while running.load(Ordering::Acquire) {
let line = read_line_from_stdin();
sender.send(line)?;
}
std::mem::drop(sender);
handle.join().unwrap().unwrap();
The shutdown sequence has 3 steps:
- we first obtain the
JoinHandle
to the thread - we drop all copies of
sender
, effectively closing the channel - in the worker thread,
receiver.recv_async()
would result in an error, as stated in the docsAsynchronously receive a value from the channel, returning an error if all senders have been dropped.
- the worker thread finishes, joining the main thread
Async -> Syncโ
The other way around is equally simple, as illustrated in SeaStreamer's example.
Conclusionโ
sync | async | |
---|---|---|
to spawn worker | std::thread::spawn | tokio::task::spawn |
concurrency | multi-threaded | can be multi-threaded or single-threaded |
worker is | FnOnce | Future |
send message with | send | send |
receive message with | recv | recv_async |
waiting for messages | blocking | yield to runtime |
In this article we discussed:
- Multi-threaded parallelism in sync realm
- Concurrency in async realm - with
tokio
andflume
- Bridging sync and async code with
flume
Now you already learnt the powers of flume
, but there is more!
In the next episode, hopefully we will get to discuss other interesting features of flume
- bounded channels and 'rendezvous channels'.
Rustacean Sticker Pack ๐ฆโ
The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!
Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.
Sticker Pack Contents:
- Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
- Mascot of SeaQL: Terres the Hermit Crab
- Mascot of Rust: Ferris the Crab
- The Rustacean word
Support SeaQL and get a Sticker Pack!