Skip to main content

The rainbow bridge between sync and async Rust

ยท 8 min read
Chris Tsang

This story stems from the saying "What Color is Your Function?" as a criticism to the async implementation of common programming languages. Well, Rust also falls into the category of "colored functions". So in this blog post, let's see how we can design systems to effectively combine sync and async code.

Rainbow bridge is a reference to the bridge in Thor that teleports you between different realms - a perfect analogy!

Backgroundโ€‹

Sync code can be blocking IO, or expensive computation. Async code is usually network IO where you'd wait for results.

In both cases, we want to maximize concurrency, such that the program can make full use of the CPU instead of sitting there idle. A common approach is message passing, where we package tasks and send them to different workers for execution.

Sync -> Syncโ€‹

Let's start with the classic example, pure sync code. There exists std::sync::mpsc in the standard library, so let's take a look.

use std::sync::mpsc::channel;

// create an unbounded channel
let (sender, receiver) = channel();

// never blocks
sender.send("Hello".to_string()).unwrap();

let handle = std::thread::spawn(move|| {
// wait until there is a message
let message = receiver.recv().unwrap();
println!("{message}");
});

handle.join().unwrap();
println!("Bye");

Prints (Playground):

Hello
Bye

Now, we'll make a more elaborate example: a program that spawns a number of worker threads to perform some 'expensive' computation. The main thread would dispatch the tasks to those threads and in turn collect the results via another channel.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    tasks    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   result
โ”‚ โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 1 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ main thread โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ• โ•โ•โ•โ•โ•ก main thread โ”‚
โ”‚ โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 2 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

First, setup the channels.

let (result, collector) = channel(); // result
let mut senders = Vec::new();
for _ in 0..THREADS {
let (sender, receiver) = channel(); // tasks
senders.push(sender);
let result = result.clone();
std::thread::spawn(move || worker(receiver, result));
}

The worker thread looks like:

fn worker(receiver: Receiver<Task>, sender: Sender<Done>) {
while let Ok(task) = receiver.recv() {
let result = process(task);
sender.send(result).unwrap();
}
}

Then, dispatch tasks.

for c in 0..TASKS {
let task = some_random_task();
senders[c % THREADS].send(task).unwrap();
}

Finally, we can collect results.

for _ in 0..TASKS {
let result = collector.recv().unwrap();
println!("{result:?}");
}

Full source code can be found here.

Async -> Asyncโ€‹

Next, we'll migrate to async land. Using tokio::sync::mpsc, it's very similar to the above example, except every operation is async and thus imposes additional restrictions to lifetimes. (The trick is, just move / clone. Don't borrow)

tokio's unbounded_channel is the equivalent to std's channel. Otherwise it's very similar. The spawn method takes in a Future; since the worker needs to take in the channels, we construct an async closure with async move {}.

stdtokio
(unbounded) channelunbounded_channel
sync_channel(bounded) channel
let (result, mut collector) = unbounded_channel();
let mut senders = Vec::new();
for _ in 0..WORKERS {
let (sender, mut receiver) = unbounded_channel();
senders.push(sender);
let result = result.clone();
tokio::task::spawn(async move {
while let Some(task) = receiver.recv().await {
result.send(process(task).await).unwrap();
}
});
}
std::mem::drop(result); // <-- ?

Why do we need to drop the result sender? This is one of the foot gun: tokio would swallow panics originated within the task, and so if that happened, the program would never exit. By dropping the last copy of result in scope, the channel would automatically close after all tasks exit, which in turn would triggle up to our collector.

The rest is almost the same.

for (i, task) in tasks.iter().enumerate() {
senders[i % WORKERS].send(task.clone()).unwrap();
}
std::mem::drop(senders);

for _ in 0..tasks.len() {
let result = collector.recv().await.unwrap();
println!("{result:?}");
}

Full source code can be found here.

Flume mpmcโ€‹

mpmc - multi producer, multi consumerโ€‹

The previous examples have a flaw: we have to spawn multiple mpsc channels to send tasks, which is:

  1. clumsy. we need to keep a list of senders
  2. not the most efficient. is round-robin the best way of distributing tasks? some of the workers may remain idle

Here is the ideal setup:

                      tasks   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   result
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 1 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ main thread โ•žโ•โ•โ•โ•ฃ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ• โ•โ•โ•โ•ก main thread โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 2 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Let's rewrite our example using Flume. But first, know the mapping between tokio and flume:

TokioFlume
unbounded_channelunbounded (channel)
(bounded) channelbounded (channel)
sendsend
recvrecv_async

In tokio, the method is exclusive: async fn recv(&mut self); in flume, the method is fn recv_async(&self) -> RecvFut. The type signature already told you the distinction between mpsc vs mpmc! It is wrong to use the blocking recv method in async context in flume, but sadly the compiler would not warn you about it.

The channel setup is now slightly simpler:

let (sender, receiver) = unbounded(); // task
let (result, collector) = unbounded(); // result

for _ in 0..WORKERS {
let receiver = receiver.clone();
let result = result.clone();
tokio::task::spawn(async move {
while let Ok(task) = receiver.recv_async().await {
result.send(process(task).await).unwrap();
}
});
}

We no longer have to dispatch tasks ourselves. All workers share the same task queue, and thus workers would fetch the next task as soon as the previous one is finished - effectively load balance among themselves!

for task in &tasks {
sender.send(task.clone()).unwrap();
}

for _ in 0..tasks.len() {
let result = collector.recv_async().await.unwrap();
println!("{result:?}");
}

Full source code can be found here.

Sync -> Asyncโ€‹

In the final example, let's consider a program that is mostly sync, but has a few async operations that we want to handle in a background thread.

In the example below, our blocking operation is 'reading from stdin' from the main thread. And we send those lines to an async thread to handle.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ main thread โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก async thread โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

It follows the usual 3 steps:

  1. create a flume channel
  2. pass the receiver end to a worker thread
  3. send tasks over the channel
fn main() -> Result<()> {
let (sender, receiver) = unbounded(); // flume channel

std::thread::spawn(move || {
// this runtime is single-threaded
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
rt.block_on(handler(receiver))
});

loop {
let mut line = String::new();
// this blocks the current thread until there is a new line
match std::io::stdin().read_line(&mut line) {
Ok(0) => break, // this means stdin is closed
Ok(_) => (),
Err(e) => panic!("{e:?}"),
}
sender.send(line)?;
}

Ok(())
}

This is the handler:

async fn handler(receiver: Receiver<String>) -> Result<()> {
while let Ok(line) = receiver.recv_async().await {
process(line).await?;
}
Ok(())
}

It doesn't look much different from the async -> async example, the only difference is one side is sync! Full source code can be found here.

Graceful shutdownโ€‹

The above code has a problem: we never know whether a line has been processed. If the program has an exit mechanism from handling sigint, there is a possibility of exiting before all the lines has been processed.

Let's see how we can shutdown properly.

let handle = std::thread::spawn(..);

// running is an AtomicBool
while running.load(Ordering::Acquire) {
let line = read_line_from_stdin();
sender.send(line)?;
}

std::mem::drop(sender);
handle.join().unwrap().unwrap();

The shutdown sequence has 3 steps:

  1. we first obtain the JoinHandle to the thread
  2. we drop all copies of sender, effectively closing the channel
  3. in the worker thread, receiver.recv_async() would result in an error, as stated in the docs

    Asynchronously receive a value from the channel, returning an error if all senders have been dropped.

  4. the worker thread finishes, joining the main thread

Async -> Syncโ€‹

The other way around is equally simple, as illustrated in SeaStreamer's example.

Conclusionโ€‹

syncasync
to spawn workerstd::thread::spawntokio::task::spawn
concurrencymulti-threadedcan be multi-threaded or single-threaded
worker isFnOnceFuture
send message withsendsend
receive message withrecvrecv_async
waiting for messagesblockingyield to runtime

In this article we discussed:

  1. Multi-threaded parallelism in sync realm
  2. Concurrency in async realm - with tokio and flume
  3. Bridging sync and async code with flume

Now you already learnt the powers of flume, but there is more!

In the next episode, hopefully we will get to discuss other interesting features of flume - bounded channels and 'rendezvous channels'. Follow our X / Twitter for updates!

Rustacean Sticker Pack ๐Ÿฆ€โ€‹

The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!

Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.

Sticker Pack Contents:

  • Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
  • Mascot of SeaQL: Terres the Hermit Crab
  • Mascot of Rust: Ferris the Crab
  • The Rustacean word

Support SeaQL and get a Sticker Pack!

Rustacean Sticker Pack by SeaQL