Skip to main content

ยท 11 min read
Billy Chan

In this tutorial, we would create a REST notepad backend starting from scratch and adding a new REST endpoint to handle file uploads in Loco.

The full source code can be found here. The documentation of the REST API is available here.

What is Loco?โ€‹

Loco is a Rails inspired web framework for Rust. It includes many Rails feature with Rust ergonomics. Loco integrates seamlessly with SeaORM, offering a first-class development experience!

  • Controllers and routing via axum
  • Models, migration, and ActiveRecord via SeaORM
  • Views via serde
  • Seamless, Background jobs via sidekiq-rs, multi modal: in process, out of process, async via Tokio
  • ...and more

REST API Starter Templateโ€‹

Install loco-cli:

cargo install loco-cli

The loco-cli provides three starter templates:

  • SaaS Starter
  • Rest API Starter
  • Lightweight Service Starter

For this tutorial, we want the "Rest API Starter" template:

$ loco new

โœ” You are inside a git repository. Do you wish to continue? ยท Yes
โœ” App name? ยท loco_starter
โœ” What would you like to build? ยท Rest API (with DB and user auth)

๐Ÿš‚ Loco app generated successfully in:
/sea-orm/examples/loco_starter

Next, we need to setup our PostgreSQL database.

docker run -d -p 5432:5432 -e POSTGRES_USER=loco -e POSTGRES_DB=loco_starter_development -e POSTGRES_PASSWORD="loco" postgres:15.3-alpine

If you want to use MySQL or SQLite as the database. Please update the database.uri configuration in loco_starter/config/development.yaml. And enable the corresponding database backend feature flag of SeaORM in loco_starter/Cargo.toml.

Now, start our REST application:

$ cargo loco start

Finished `dev` profile [unoptimized + debuginfo] target(s) in 1m 42s
Running `target/debug/loco_starter-cli start`
2024-05-20T06:56:42.724350Z INFO app: loco_rs::config: loading environment from selected_path="config/development.yaml" environment=development
2024-05-20T06:56:42.740338Z WARN app: loco_rs::boot: pretty backtraces are enabled (this is great for development but has a runtime cost for production. disable with `logger.pretty_backtrace` in your config yaml) environment=development
2024-05-20T06:56:42.833747Z INFO app: loco_rs::db: auto migrating environment=development
2024-05-20T06:56:42.845983Z INFO app: sea_orm_migration::migrator: Applying all pending migrations environment=development
2024-05-20T06:56:42.850231Z INFO app: sea_orm_migration::migrator: Applying migration 'm20220101_000001_users' environment=development
2024-05-20T06:56:42.864095Z INFO app: sea_orm_migration::migrator: Migration 'm20220101_000001_users' has been applied environment=development
2024-05-20T06:56:42.865799Z INFO app: sea_orm_migration::migrator: Applying migration 'm20231103_114510_notes' environment=development
2024-05-20T06:56:42.873653Z INFO app: sea_orm_migration::migrator: Migration 'm20231103_114510_notes' has been applied environment=development
2024-05-20T06:56:42.875645Z INFO app: loco_rs::boot: initializers loaded initializers="" environment=development
2024-05-20T06:56:42.906072Z INFO app: loco_rs::controller::app_routes: [GET] /api/_ping environment=development
2024-05-20T06:56:42.906176Z INFO app: loco_rs::controller::app_routes: [GET] /api/_health environment=development
2024-05-20T06:56:42.906264Z INFO app: loco_rs::controller::app_routes: [GET] /api/notes environment=development
2024-05-20T06:56:42.906335Z INFO app: loco_rs::controller::app_routes: [POST] /api/notes environment=development
2024-05-20T06:56:42.906414Z INFO app: loco_rs::controller::app_routes: [GET] /api/notes/:id environment=development
2024-05-20T06:56:42.906501Z INFO app: loco_rs::controller::app_routes: [DELETE] /api/notes/:id environment=development
2024-05-20T06:56:42.906558Z INFO app: loco_rs::controller::app_routes: [POST] /api/notes/:id environment=development
2024-05-20T06:56:42.906609Z INFO app: loco_rs::controller::app_routes: [POST] /api/auth/register environment=development
2024-05-20T06:56:42.906680Z INFO app: loco_rs::controller::app_routes: [POST] /api/auth/verify environment=development
2024-05-20T06:56:42.906753Z INFO app: loco_rs::controller::app_routes: [POST] /api/auth/login environment=development
2024-05-20T06:56:42.906838Z INFO app: loco_rs::controller::app_routes: [POST] /api/auth/forgot environment=development
2024-05-20T06:56:42.906931Z INFO app: loco_rs::controller::app_routes: [POST] /api/auth/reset environment=development
2024-05-20T06:56:42.907012Z INFO app: loco_rs::controller::app_routes: [GET] /api/user/current environment=development
2024-05-20T06:56:42.907309Z INFO app: loco_rs::controller::app_routes: [Middleware] Adding limit payload data="5mb" environment=development
2024-05-20T06:56:42.907440Z INFO app: loco_rs::controller::app_routes: [Middleware] Adding log trace id environment=development
2024-05-20T06:56:42.907714Z INFO app: loco_rs::controller::app_routes: [Middleware] Adding cors environment=development
2024-05-20T06:56:42.907788Z INFO app: loco_rs::controller::app_routes: [Middleware] Adding etag layer environment=development

โ–„ โ–€
โ–€ โ–„
โ–„ โ–€ โ–„ โ–„ โ–„โ–€
โ–„ โ–€โ–„โ–„
โ–„ โ–€ โ–€ โ–€โ–„โ–€โ–ˆโ–„
โ–€โ–ˆโ–„
โ–„โ–„โ–„โ–„โ–„โ–„โ–„ โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„ โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„ โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„โ–„ โ–€โ–€โ–ˆ
โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–€โ–ˆ
โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–€โ–€โ–€ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–„โ–ˆโ–„
โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–„
โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–„โ–„โ–„ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ
โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆ โ–ˆโ–ˆโ–ˆโ–ˆโ–€
โ–€โ–€โ–€โ–ˆโ–ˆโ–„ โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€ โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€ โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€ โ–ˆโ–ˆโ–€
โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€โ–€
https://loco.rs

environment: development
database: automigrate
logger: debug
compilation: debug
modes: server

listening on [::]:3000

From the log messages printed above, we saw:

  • Database migrations have been applied
  • All available REST API

To check if the application listen for requests:

$ curl --location 'http://localhost:3000/api/_ping'

{"ok":true}

User Managementโ€‹

The starter template comes with a basic user management module.

Registrationโ€‹

It is a common practice to send a verification email to the provided email. However, that would requires a SMTP server and this is not the focus of this blog post. So, I will skip the email verification:

loco_starter/src/controllers/auth.rs
#[debug_handler]
async fn register(
State(ctx): State<AppContext>,
Json(params): Json<RegisterParams>,
) -> Result<Response> {
let res = users::Model::create_with_password(&ctx.db, &params).await;

let user = match res {
Ok(user) => user,
Err(err) => {
tracing::info!(
message = err.to_string(),
user_email = &params.email,
"could not register user",
);
return format::json(());
}
};

+ // Skip email verification, all new registrations are considered verified
+ let _user = user
+ .into_active_model()
+ .verified(&ctx.db)
+ .await?;

+ // Skip sending verification email as we don't have a mail server
+ /*
let user = user
.into_active_model()
.set_email_verification_sent(&ctx.db)
.await?;

AuthMailer::send_welcome(&ctx, &user).await?;
+ */

format::json(())
}

Compile and run the application, then register a new user account:

$ curl --location 'http://localhost:3000/api/auth/register' \
--data-raw '{
"name": "Billy",
"email": "cwchan.billy@gmail.com",
"password": "password"
}'

null

Loginโ€‹

You should see there is a new row of user in the database.

Next, we login the user account with the corresponding email and password:

$ curl --location 'http://localhost:3000/api/auth/login' \
--data-raw '{
"email": "cwchan.billy@gmail.com",
"password": "password"
}'

{
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJwaWQiOiIxMWQwMWFmMy02ZmUyLTQ0ZjMtODlmMC1jMDJjZWMzOTc0MWQiLCJleHAiOjE3MTY3OTU3NjR9.i1OElxy33rkorkxk6QpTG1Kg4_Q8O0jqBJ2i82nltkcQYZsLmSSnrxtdtlfdvV0ccJ3hQA3JoY9L13cjz2uSCw",
"pid": "11d01af3-6fe2-44f3-89f0-c02cec39741d",
"name": "Billy",
"is_verified": true
}

Authenticationโ€‹

The JWT token above will be used in user authentication. You must set the Authorization header to access any REST endpoint that requires user login.

For example, fetching the user info of the current user:

$ curl --location 'http://localhost:3000/api/user/current' \
--header 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJwaWQiOiIxMWQwMWFmMy02ZmUyLTQ0ZjMtODlmMC1jMDJjZWMzOTc0MWQiLCJleHAiOjE3MTY3OTU3NjR9.i1OElxy33rkorkxk6QpTG1Kg4_Q8O0jqBJ2i82nltkcQYZsLmSSnrxtdtlfdvV0ccJ3hQA3JoY9L13cjz2uSCw'

{
"pid":"11d01af3-6fe2-44f3-89f0-c02cec39741d",
"name":"Billy",
"email":"cwchan.billy@gmail.com"
}

Handling REST Requestsโ€‹

The starter application comes with a notes controller for the notes table.

Create Notesโ€‹

$ curl --location 'http://localhost:3000/api/notes' \
--header 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJwaWQiOiIxMWQwMWFmMy02ZmUyLTQ0ZjMtODlmMC1jMDJjZWMzOTc0MWQiLCJleHAiOjE3MTY3OTU3NjR9.i1OElxy33rkorkxk6QpTG1Kg4_Q8O0jqBJ2i82nltkcQYZsLmSSnrxtdtlfdvV0ccJ3hQA3JoY9L13cjz2uSCw' \
--data '{
"title": "Getting Started with Loco & SeaORM",
"content": "In this tutorial, we would create an REST notepad backend starting from scratch and adding a new REST endpoint to handle file uploads."
}'

{
"created_at": "2024-05-20T08:43:45.408449",
"updated_at": "2024-05-20T08:43:45.408449",
"id": 1,
"title": "Getting Started with Loco & SeaORM",
"content": "In this tutorial, we would create an REST notepad backend starting from scratch and adding a new REST endpoint to handle file uploads."
}

List Notesโ€‹

$ curl --location 'http://localhost:3000/api/notes' \
--header 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJwaWQiOiIxMWQwMWFmMy02ZmUyLTQ0ZjMtODlmMC1jMDJjZWMzOTc0MWQiLCJleHAiOjE3MTY3OTU3NjR9.i1OElxy33rkorkxk6QpTG1Kg4_Q8O0jqBJ2i82nltkcQYZsLmSSnrxtdtlfdvV0ccJ3hQA3JoY9L13cjz2uSCw'

[
{
"created_at": "2024-05-20T08:43:45.408449",
"updated_at": "2024-05-20T08:43:45.408449",
"id": 1,
"title": "Getting Started with Loco & SeaORM",
"content": "In this tutorial, we would create an REST notepad backend starting from scratch and adding a new REST endpoint to handle file uploads."
},
{
"created_at": "2024-05-20T08:45:38.973130",
"updated_at": "2024-05-20T08:45:38.973130",
"id": 2,
"title": "Introducing SeaORM X",
"content": "SeaORM X is built on top of SeaORM with support for SQL Server"
}
]

Get Notesโ€‹

$ curl --location 'http://localhost:3000/api/notes/2' \
--header 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJwaWQiOiIxMWQwMWFmMy02ZmUyLTQ0ZjMtODlmMC1jMDJjZWMzOTc0MWQiLCJleHAiOjE3MTY3OTU3NjR9.i1OElxy33rkorkxk6QpTG1Kg4_Q8O0jqBJ2i82nltkcQYZsLmSSnrxtdtlfdvV0ccJ3hQA3JoY9L13cjz2uSCw'

{
"created_at": "2024-05-20T08:45:38.973130",
"updated_at": "2024-05-20T08:45:38.973130",
"id": 2,
"title": "Introducing SeaORM X",
"content": "SeaORM X is built on top of SeaORM with support for SQL Server"
}

Handling File Uploadsโ€‹

Next, we will add a file upload feature where user can upload files that is related to the notes.

File Table Migrationโ€‹

Create a migration file for the new files table. Each row of files reference a specific notes in the database.

loco_starter/migration/src/m20240520_173001_files.rs
use sea_orm_migration::{prelude::*, schema::*};

use super::m20231103_114510_notes::Notes;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
table_auto(Files::Table)
.col(pk_auto(Files::Id))
.col(integer(Files::NotesId))
.col(string(Files::FilePath))
.foreign_key(
ForeignKey::create()
.name("FK_files_notes_id")
.from(Files::Table, Files::NotesId)
.to(Notes::Table, Notes::Id),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Files::Table).to_owned())
.await
}
}

#[derive(DeriveIden)]
pub enum Files {
Table,
Id,
NotesId,
FilePath,
}

Then, we need to enable the new migration.

loco_starter/migration/src/lib.rs
#![allow(elided_lifetimes_in_paths)]
#![allow(clippy::wildcard_imports)]
pub use sea_orm_migration::prelude::*;

mod m20220101_000001_users;
mod m20231103_114510_notes;
+ mod m20240520_173001_files;

pub struct Migrator;

#[async_trait::async_trait]
impl MigratorTrait for Migrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20220101_000001_users::Migration),
Box::new(m20231103_114510_notes::Migration),
+ Box::new(m20240520_173001_files::Migration),
]
}
}

Compile and start the application, it should run our new migration on startup.

$ cargo loco start

...
2024-05-20T09:39:59.607525Z INFO app: loco_rs::db: auto migrating environment=development
2024-05-20T09:39:59.611997Z INFO app: sea_orm_migration::migrator: Applying all pending migrations environment=development
2024-05-20T09:39:59.621699Z INFO app: sea_orm_migration::migrator: Applying migration 'm20240520_173001_files' environment=development
2024-05-20T09:39:59.643886Z INFO app: sea_orm_migration::migrator: Migration 'm20240520_173001_files' has been applied environment=development
...

File Model Definitionโ€‹

Define files entity model.

loco_starter/src/models/_entities/files.rs
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "files")]
pub struct Model {
pub created_at: DateTime,
pub updated_at: DateTime,
#[sea_orm(primary_key)]
pub id: i32,
pub notes_id: i32,
pub file_path: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::notes::Entity",
from = "Column::NotesId",
to = "super::notes::Column::Id"
)]
Notes,
}

impl Related<super::notes::Entity> for Entity {
fn to() -> RelationDef {
Relation::Notes.def()
}
}

Implement the ActiveModelBehavior in the parent module.

loco_starter/src/models/files.rs
use sea_orm::entity::prelude::*;

use super::_entities::files::ActiveModel;

impl ActiveModelBehavior for ActiveModel {
// extend activemodel below (keep comment for generators)
}

File Controllerโ€‹

Controller is where we handle the file uploading, listing and viewing.

Upload Fileโ€‹

The following upload handler allows multiple files to be uploaded in a single POST request.

loco_starter/src/controllers/files.rs
#[debug_handler]
pub async fn upload(
_auth: auth::JWT,
Path(notes_id): Path<i32>,
State(ctx): State<AppContext>,
mut multipart: Multipart,
) -> Result<Response> {
// Collect all uploaded files
let mut files = Vec::new();

// Iterate all files in the POST body
while let Some(field) = multipart.next_field().await.map_err(|err| {
tracing::error!(error = ?err,"could not readd multipart");
Error::BadRequest("could not readd multipart".into())
})? {
// Get the file name
let file_name = match field.file_name() {
Some(file_name) => file_name.to_string(),
_ => return Err(Error::BadRequest("file name not found".into())),
};

// Get the file content as bytes
let content = field.bytes().await.map_err(|err| {
tracing::error!(error = ?err,"could not readd bytes");
Error::BadRequest("could not readd bytes".into())
})?;

// Create a folder to store the uploaded file
let now = chrono::offset::Local::now()
.format("%Y%m%d_%H%M%S")
.to_string();
let uuid = uuid::Uuid::new_v4().to_string();
let folder = format!("{now}_{uuid}");
let upload_folder = PathBuf::from(UPLOAD_DIR).join(&folder);
fs::create_dir_all(&upload_folder).await?;

// Write the file into the newly created folder
let path = upload_folder.join(file_name);
let mut f = fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&path)
.await?;
f.write_all(&content).await?;
f.flush().await?;

// Record the file upload in database
let file = files::ActiveModel {
notes_id: ActiveValue::Set(notes_id),
file_path: ActiveValue::Set(
path.strip_prefix(UPLOAD_DIR)
.unwrap()
.to_str()
.unwrap()
.to_string(),
),
..Default::default()
}
.insert(&ctx.db)
.await?;

files.push(file);
}

format::json(files)
}

Try uploading multiple files in a single POST request:

All uploaded files are saved into the uploads directory:

List Fileโ€‹

List all files that are related to a specific notes_id.

loco_starter/src/controllers/files.rs
#[debug_handler]
pub async fn list(
_auth: auth::JWT,
Path(notes_id): Path<i32>,
State(ctx): State<AppContext>,
) -> Result<Response> {
// Fetch all files uploaded for a specific notes
let files = files::Entity::find()
.filter(files::Column::NotesId.eq(notes_id))
.order_by_asc(files::Column::Id)
.all(&ctx.db)
.await?;

format::json(files)
}

View Fileโ€‹

View a specific files.

loco_starter/src/controllers/files.rs
#[debug_handler]
pub async fn view(
_auth: auth::JWT,
Path(files_id): Path<i32>,
State(ctx): State<AppContext>,
) -> Result<Response> {
// Fetch the file info from database
let file = files::Entity::find_by_id(files_id)
.one(&ctx.db)
.await?
.expect("File not found");

// Stream the file
let file = fs::File::open(format!("{UPLOAD_DIR}/{}", file.file_path)).await?;
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);

Ok(format::render().response().body(body)?)
}

File Controller Routesโ€‹

Add our newly defined files handler to the application routes.

loco_starter/src/controllers/files.rs
pub fn routes() -> Routes {
// Bind the routes
Routes::new()
.prefix("files")
.add("/upload/:notes_id", post(upload))
.add("/list/:notes_id", get(list))
.add("/view/:files_id", get(view))
}
loco_starter/src/app.rs
pub struct App;

#[async_trait]
impl Hooks for App {
// ...

fn routes(_ctx: &AppContext) -> AppRoutes {
AppRoutes::with_default_routes()
.prefix("/api")
.add_route(controllers::notes::routes())
.add_route(controllers::auth::routes())
.add_route(controllers::user::routes())
+ .add_route(controllers::files::routes())
}

// ...
}

Extra Rust Dependenciesโ€‹

Remember to enable multipart in axum and add tokio-util dependency.

loco_starter/Cargo.toml
- axum = "0.7.1"
+ axum = { version = "0.7.1", features = ["multipart"] }

+ tokio-util = "0.7.11"

SQL Server Supportโ€‹

SQL Server for SeaORM is now available as a closed beta. If you are interested, please signup here.

Migrating from sea-orm to sea-orm-x is straightforward with two simple steps. First, update the existing sea-orm dependency to sea-orm-x and enable the sqlz-mssql feature. Note that you might need to patch SeaORM dependency for the upstream dependencies.

Cargo.toml
sea-orm = { path = "<SEA_ORM_X_ROOT>/sea-orm-x", features = ["runtime-async-std-rustls", "sqlz-mssql"] }
sea-orm-migration = { path = "<SEA_ORM_X_ROOT>/sea-orm-x/sea-orm-migration" }

# Patch SeaORM dependency for the upstream dependencies
[patch.crates-io]
sea-orm = { path = "<SEA_ORM_X_ROOT>/sea-orm-x" }
sea-orm-migration = { path = "<SEA_ORM_X_ROOT>/sea-orm-x/sea-orm-migration" }

Second, update the connection string to connect to the MSSQL database.

# If the schema is `dbo`, simply write:
mssql://username:password@host/database

# Or, specify the schema name by providing an extra `currentSchema` query param.
mssql://username:password@host/database?currentSchema=my_schema

# You can trust peer certificate by providing an extra trustCertificate query param.
mssql://username:password@host/database?trustCertificate=true

SeaORM X has full Loco support and integrate seamlessly with many web frameworks:

  • Actix
  • Axum
  • Async GraphQL
  • jsonrpsee
  • Loco
  • Poem
  • Salvo
  • Tonic

Happy Coding!

ยท 8 min read
Chris Tsang

This story stems from the saying "What Color is Your Function?" as a criticism to the async implementation of common programming languages. Well, Rust also falls into the category of "colored functions". So in this blog post, let's see how we can design systems to effectively combine sync and async code.

Rainbow bridge is a reference to the bridge in Thor that teleports you between different realms - a perfect analogy!

Backgroundโ€‹

Sync code can be blocking IO, or expensive computation. Async code is usually network IO where you'd wait for results.

In both cases, we want to maximize concurrency, such that the program can make full use of the CPU instead of sitting there idle. A common approach is message passing, where we package tasks and send them to different workers for execution.

Sync -> Syncโ€‹

Let's start with the classic example, pure sync code. There exists std::sync::mpsc in the standard library, so let's take a look.

use std::sync::mpsc::channel;

// create an unbounded channel
let (sender, receiver) = channel();

// never blocks
sender.send("Hello".to_string()).unwrap();

let handle = std::thread::spawn(move|| {
// wait until there is a message
let message = receiver.recv().unwrap();
println!("{message}");
});

handle.join().unwrap();
println!("Bye");

Prints (Playground):

Hello
Bye

Now, we'll make a more elaborate example: a program that spawns a number of worker threads to perform some 'expensive' computation. The main thread would dispatch the tasks to those threads and in turn collect the results via another channel.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    tasks    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   result
โ”‚ โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 1 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ main thread โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ• โ•โ•โ•โ•โ•ก main thread โ”‚
โ”‚ โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 2 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

First, setup the channels.

let (result, collector) = channel(); // result
let mut senders = Vec::new();
for _ in 0..THREADS {
let (sender, receiver) = channel(); // tasks
senders.push(sender);
let result = result.clone();
std::thread::spawn(move || worker(receiver, result));
}

The worker thread looks like:

fn worker(receiver: Receiver<Task>, sender: Sender<Done>) {
while let Ok(task) = receiver.recv() {
let result = process(task);
sender.send(result).unwrap();
}
}

Then, dispatch tasks.

for c in 0..TASKS {
let task = some_random_task();
senders[c % THREADS].send(task).unwrap();
}

Finally, we can collect results.

for _ in 0..TASKS {
let result = collector.recv().unwrap();
println!("{result:?}");
}

Full source code can be found here.

Async -> Asyncโ€‹

Next, we'll migrate to async land. Using tokio::sync::mpsc, it's very similar to the above example, except every operation is async and thus imposes additional restrictions to lifetimes. (The trick is, just move / clone. Don't borrow)

tokio's unbounded_channel is the equivalent to std's channel. Otherwise it's very similar. The spawn method takes in a Future; since the worker needs to take in the channels, we construct an async closure with async move {}.

stdtokio
(unbounded) channelunbounded_channel
sync_channel(bounded) channel
let (result, mut collector) = unbounded_channel();
let mut senders = Vec::new();
for _ in 0..WORKERS {
let (sender, mut receiver) = unbounded_channel();
senders.push(sender);
let result = result.clone();
tokio::task::spawn(async move {
while let Some(task) = receiver.recv().await {
result.send(process(task).await).unwrap();
}
});
}
std::mem::drop(result); // <-- ?

Why do we need to drop the result sender? This is one of the foot gun: tokio would swallow panics originated within the task, and so if that happened, the program would never exit. By dropping the last copy of result in scope, the channel would automatically close after all tasks exit, which in turn would triggle up to our collector.

The rest is almost the same.

for (i, task) in tasks.iter().enumerate() {
senders[i % WORKERS].send(task.clone()).unwrap();
}
std::mem::drop(senders);

for _ in 0..tasks.len() {
let result = collector.recv().await.unwrap();
println!("{result:?}");
}

Full source code can be found here.

Flume mpmcโ€‹

mpmc - multi producer, multi consumerโ€‹

The previous examples have a flaw: we have to spawn multiple mpsc channels to send tasks, which is:

  1. clumsy. we need to keep a list of senders
  2. not the most efficient. is round-robin the best way of distributing tasks? some of the workers may remain idle

Here is the ideal setup:

                      tasks   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   result
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ•”โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 1 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•— โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ main thread โ•žโ•โ•โ•โ•ฃ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ• โ•โ•โ•โ•ก main thread โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ•šโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก worker thread 2 โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Let's rewrite our example using Flume. But first, know the mapping between tokio and flume:

TokioFlume
unbounded_channelunbounded (channel)
(bounded) channelbounded (channel)
sendsend
recvrecv_async

In tokio, the method is exclusive: async fn recv(&mut self); in flume, the method is fn recv_async(&self) -> RecvFut. The type signature already told you the distinction between mpsc vs mpmc! It is wrong to use the blocking recv method in async context in flume, but sadly the compiler would not warn you about it.

The channel setup is now slightly simpler:

let (sender, receiver) = unbounded(); // task
let (result, collector) = unbounded(); // result

for _ in 0..WORKERS {
let receiver = receiver.clone();
let result = result.clone();
tokio::task::spawn(async move {
while let Ok(task) = receiver.recv_async().await {
result.send(process(task).await).unwrap();
}
});
}

We no longer have to dispatch tasks ourselves. All workers share the same task queue, and thus workers would fetch the next task as soon as the previous one is finished - effectively load balance among themselves!

for task in &tasks {
sender.send(task.clone()).unwrap();
}

for _ in 0..tasks.len() {
let result = collector.recv_async().await.unwrap();
println!("{result:?}");
}

Full source code can be found here.

Sync -> Asyncโ€‹

In the final example, let's consider a program that is mostly sync, but has a few async operations that we want to handle in a background thread.

In the example below, our blocking operation is 'reading from stdin' from the main thread. And we send those lines to an async thread to handle.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ main thread โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก async thread โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

It follows the usual 3 steps:

  1. create a flume channel
  2. pass the receiver end to a worker thread
  3. send tasks over the channel
fn main() -> Result<()> {
let (sender, receiver) = unbounded(); // flume channel

std::thread::spawn(move || {
// this runtime is single-threaded
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
rt.block_on(handler(receiver))
});

loop {
let mut line = String::new();
// this blocks the current thread until there is a new line
match std::io::stdin().read_line(&mut line) {
Ok(0) => break, // this means stdin is closed
Ok(_) => (),
Err(e) => panic!("{e:?}"),
}
sender.send(line)?;
}

Ok(())
}

This is the handler:

async fn handler(receiver: Receiver<String>) -> Result<()> {
while let Ok(line) = receiver.recv_async().await {
process(line).await?;
}
Ok(())
}

It doesn't look much different from the async -> async example, the only difference is one side is sync! Full source code can be found here.

Graceful shutdownโ€‹

The above code has a problem: we never know whether a line has been processed. If the program has an exit mechanism from handling sigint, there is a possibility of exiting before all the lines has been processed.

Let's see how we can shutdown properly.

let handle = std::thread::spawn(..);

// running is an AtomicBool
while running.load(Ordering::Acquire) {
let line = read_line_from_stdin();
sender.send(line)?;
}

std::mem::drop(sender);
handle.join().unwrap().unwrap();

The shutdown sequence has 3 steps:

  1. we first obtain the JoinHandle to the thread
  2. we drop all copies of sender, effectively closing the channel
  3. in the worker thread, receiver.recv_async() would result in an error, as stated in the docs

    Asynchronously receive a value from the channel, returning an error if all senders have been dropped.

  4. the worker thread finishes, joining the main thread

Async -> Syncโ€‹

The other way around is equally simple, as illustrated in SeaStreamer's example.

Conclusionโ€‹

syncasync
to spawn workerstd::thread::spawntokio::task::spawn
concurrencymulti-threadedcan be multi-threaded or single-threaded
worker isFnOnceFuture
send message withsendsend
receive message withrecvrecv_async
waiting for messagesblockingyield to runtime

In this article we discussed:

  1. Multi-threaded parallelism in sync realm
  2. Concurrency in async realm - with tokio and flume
  3. Bridging sync and async code with flume

Now you already learnt the powers of flume, but there is more!

In the next episode, hopefully we will get to discuss other interesting features of flume - bounded channels and 'rendezvous channels'. Follow our X / Twitter for updates!

Rustacean Sticker Pack ๐Ÿฆ€โ€‹

The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!

Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.

Sticker Pack Contents:

  • Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
  • Mascot of SeaQL: Terres the Hermit Crab
  • Mascot of Rust: Ferris the Crab
  • The Rustacean word

Support SeaQL and get a Sticker Pack!

Rustacean Sticker Pack by SeaQL

ยท 6 min read
Chris Tsang

This tutorial shows you how to use Rust to build a system that:

  1. Subscribe to a real-time websocket data feed
  2. Stream the data to Kafka / Redis
  3. Save the data into a SQL database

Here, we'll employ a micro-services architecture, and split the functionality into two apps:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Websocket Data Feed โ”‚ ---> Redis / Kafka ---> โ”‚ SQL Data Sink โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

In stream processing, we often use the terms "source" / "sink", but a data sink is simply a stream consumer that persists the data into a store.

On the source side, we'd use SeaStreamer. On the sink side, we'd be using SeaORM. Below are the supported technologies; for the rest of this article, we'll be using Redis and SQLite because they're easy to setup.

SeaStreamerSeaORM
Kafka, RedisMySQL, Postgres, SQLite, SQL Server1

To get started, you can quickly start a Redis instance via Docker:

docker run -d --rm --name redis -p 6379:6379 redis

1. Websocket subscriptionโ€‹

Let's write a websocket subscriber in Rust. Here we'd use the awesome async-tungstenite library.

We'd subscribe to the GBP/USD price feed from Kraken, API documentation can be found here. NB: they're not real FX data, but should be good enough for demo.

Step 1, create a websocket connection:

let (mut ws, _) = async_tungstenite::tokio::connect_async("wss://ws.kraken.com/").await?;

Step 2, send a subscription request:

ws.send(Message::Text(
r#"{ "event": "subscribe", "pair": ["GBP/USD"], "subscription": { "name": "spread" } }"#.to_owned(),
)).await?;

Step 3, stream the messages:

loop {
match ws.next().await {
Some(Ok(Message::Text(data))) => {
if data == r#"{"event":"heartbeat"}"# {
continue;
}
println!("{data}");
}
Some(Err(e)) => bail!("Socket error: {e}"),
None => bail!("Stream ended"),
e => bail!("Unexpected message {e:?}"),
}
}

2. Redis / Kafka Stream Producerโ€‹

Step 1, create a SeaStreamer instance connecting to Redis / Kafka:

let streamer = SeaStreamer::connect(
"redis://localhost", SeaConnectOptions::default()
).await?;

There are a bunch of different options for Redis & Kafka respectively, you can refer to SeaStreamer's documentation.

Step 2, create a producer:

let producer: SeaProducer = streamer
.create_producer(
"GBP_USD".parse()?, // Stream Key
Default::default(), // Producer Options
)
.await?;

There aren't any specific options for Producer.

Step 3, decode the messages:

let spread: SpreadMessage = serde_json::from_str(&data)?;
let message = serde_json::to_string(&spread)?;

Here, we use the awesome serde library to perform message parsing and conversion:

// The raw message looks like: [80478222,["1.25475","1.25489","1714946803.030088","949.74917071","223.36195920"],"spread","GBP/USD"]

#[derive(Debug, Serialize, Deserialize)]
struct SpreadMessage {
#[allow(dead_code)]
#[serde(skip_serializing)]
channel_id: u32, // placeholder; not needed
spread: Spread, // nested object
channel_name: String,
pair: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct Spread {
bid: Decimal,
ask: Decimal,
#[serde(with = "timestamp_serde")] // custom serde
timestamp: Timestamp,
bid_vol: Decimal,
ask_vol: Decimal,
}

Step 4, send the messages:

loop {
match ws.next().await {
Some(Ok(Message::Text(data))) => {
let spread: SpreadMessage = serde_json::from_str(&data)?;
let message = serde_json::to_string(&spread)?;
producer.send(message)?; // <--
}
}
}

Note that the producer.send call is not async/await, and this is a crucial detail! This removes the stream processing bottleneck. Behind the scene, messages will be buffered and handled on a different thread, so that your input stream can run as close to real-time as possible.

Here is the complete price-feed app which you can checkout from the SeaStreamer repository:

$ cd examples/price-feed
$ cargo run

Connecting ..
Connected.
Subscribed.
{"spread":{"bid":"1.25495","ask":"1.25513","timestamp":"2024-05-05T16:31:00.961214","bid_vol":"61.50588918","ask_vol":"787.90883861"},"channel_name":"spread","pair":"GBP/USD"}
..

3. SQL Data Sinkโ€‹

Step 1, create a stream consumer:

let streamer = SeaStreamer::connect(streamer_uri, Default::default()).await?;

let consumer = streamer
.create_consumer(&[stream_key], SeaConsumerOptions::default())
.await?;

There are a bunch of different options for Redis & Kafka respectively, you can refer to SeaStreamer's examples. Here we use the default, which is a real-time state-less stream consumer.

Step 2, create a database:

let mut opt = ConnectOptions::new("sqlite://my_db.sqlite?mode=rwc"));
opt.max_connections(1).sqlx_logging(false);
let db = Database::connect(opt).await?;

We set max_connections to 1, because our data sink will not do concurrent inserts anyway.

Here is the Entity:

#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel, Deserialize)]
#[sea_orm(table_name = "event")]
pub struct Model {
#[sea_orm(primary_key)]
#[serde(default)]
pub id: i32,
pub timestamp: String,
pub bid: String,
pub ask: String,
pub bid_vol: String,
pub ask_vol: String,
}

The table shall be named event and we derive Deserialize on the Model.

We will use the following helper method to create the database table, where the schema is derived from the Entity:

async fn create_tables(db: &DbConn) -> Result<(), DbErr> {
let builder = db.get_database_backend();
let schema = Schema::new(builder);

let stmt = builder.build(
schema.create_table_from_entity(Entity).if_not_exists(),
);
log::info!("{stmt}");
db.execute(stmt).await?;

Ok(())
}

This is especially handy for SQLite, where the app owns the database schema. For other databases, you'd probably use the SeaORM migration system.

Step 3, insert the data into database:

loop {
let message = consumer.next().await?;
let payload = message.message();
let json = payload.as_str()?;
let item: Item = serde_json::from_str(json)?;
let mut spread = item.spread.into_active_model();
spread.id = NotSet; // let the db assign primary key
spread.save(&db).await?;
}

In a few lines of code, we:

  1. receive the message from Redis
  2. decode the message as JSON
  3. convert the message into a SeaORM Model
  4. insert the Model into database

Run the sea-orm-sink app in another terminal:

$ cd examples/sea-orm-sink
$ RUST_LOG=info cargo run

[INFO sea_streamer_sea_orm_sink] CREATE TABLE IF NOT EXISTS "event" ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "timestamp" varchar NOT NULL, "bid" varchar NOT NULL, "ask" varchar NOT NULL, "bid_vol" varchar NOT NULL, "ask_vol" varchar NOT NULL )
[INFO sea_streamer_sea_orm_sink] {"spread":{"bid":"1.25495","ask":"1.25513","timestamp":"2024-05-05T16:31:00.961214","bid_vol":"61.50588918","ask_vol":"787.90883861"},"channel_name":"spread","pair":"GBP/USD"}

That's it! Now you can inspect the data with your favourite database GUI and write some SQL queries:

screenshot of SQLite database

Conclusionโ€‹

In this article, we covered:

  1. Micro-services architecture in stream processing
  2. Async real-time programming in Rust
  3. The awesomeness of the SeaQL and Rust ecosystem2

Here are a few suggestions how you can take it from here:

  1. Stream the data to a "big database" like MySQL or Postgres
  2. Subscribe to more streams and sink to more tables
  3. Buffer the events and insert the data in batches to achieve higher throughput, further reads:

Rustacean Sticker Pack ๐Ÿฆ€โ€‹

The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!

Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.

Sticker Pack Contents:

  • Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
  • Mascot of SeaQL: Terres the Hermit Crab
  • Mascot of Rust: Ferris the Crab
  • The Rustacean word

Support SeaQL and get a Sticker Pack!

Rustacean Sticker Pack by SeaQL

ยท 10 min read
SeaQL Team
SeaORM 1.0-rc Banner

This blog post summarizes the new features and enhancements introduced in SeaORM 1.0-rc.x:

New Featuresโ€‹

Refreshed migration schema definitionโ€‹

#2099 We are aware that SeaORM's migration scripts can sometimes look verbose. Thanks to the clever design made by Loco, we've refreshed the schema definition syntax.

An old migration script looks like this:

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Users::Table)
.if_not_exists()
.col(
ColumnDef::new(Users::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Users::Pid).uuid().not_null())
.col(ColumnDef::new(Users::Email).string().not_null().unique_key())
// ...
}
}

Now, using the new schema helpers, you can define the schema with a simplified syntax!

// Remember to import `sea_orm_migration::schema::*`
use sea_orm_migration::{prelude::*, schema::*};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Users::Table)
.if_not_exists()
.col(pk_auto(Users::Id)) // Primary key with auto-increment
.col(uuid(Users::Pid)) // UUID column
.col(string_uniq(Users::Email)) // String column with unique and not null constraint
.col(string(Users::Password)) // String column
.col(string(Users::ApiKey).unique_key())
.col(string(Users::Name))
.col(string_null(Users::ResetToken)) // Nullable string column
.col(timestamp_null(Users::ResetSentAt)) // Nullable timestamp column
.col(string_null(Users::EmailVerificationToken))
.col(timestamp_null(Users::EmailVerificationSentAt))
.col(timestamp_null(Users::EmailVerifiedAt))
.to_owned(),
)
.await
}

// ...
}

There are three variants for each commonly used column type:

  • <COLUMN_TYPE>() helper function, e.g. string(), define a non-null string column
  • <COLUMN_TYPE>_null() helper function, e.g. string_null(), define a nullable string column
  • <COLUMN_TYPE>_uniq() helper function, e.g. string_uniq(), define a non-null and unique string column

The new schema helpers can be used by importing sea_orm_migration::schema::*. The migration library is fully backward compatible, so there is no rush to migrate old scripts. The new syntax is recommended for new scripts, and all examples in the SeaORM repository have been updated for demonstration. For advanced use cases, the old SeaQuery syntax can still be used.

Reworked SQLite Type Mappingsโ€‹

sea-orm#2077 sea-query#735 sea-schema#117 We've reworked the type mappings for SQLite across the SeaQL ecosystem, such that SeaQuery and SeaSchema are now reciprocal to each other. Migrations written with SeaQuery can be rediscovered by sea-orm-cli and generate compatible entities! In other words, the roundtrip is complete.

Data types will be mapped to SQLite types with a custom naming scheme following SQLite's affinity rule:

  • INTEGER: integer, tiny_integer, small_integer, big_integer and boolean are stored as integer
  • REAL: float, double, decimal and money are stored as real
  • BLOB: blob and varbinary_blob are stored as blob
  • TEXT: all other data types are stored as text, including string, char, text, json, uuid, date, time, datetime, timestamp, etc.

To illustrate,

assert_eq!(
Table::create()
.table(Alias::new("strange"))
.col(ColumnDef::new(Alias::new("id")).integer().not_null().auto_increment().primary_key())
.col(ColumnDef::new(Alias::new("int1")).integer())
.col(ColumnDef::new(Alias::new("int2")).tiny_integer())
.col(ColumnDef::new(Alias::new("int3")).small_integer())
.col(ColumnDef::new(Alias::new("int4")).big_integer())
.col(ColumnDef::new(Alias::new("string1")).string())
.col(ColumnDef::new(Alias::new("string2")).string_len(24))
.col(ColumnDef::new(Alias::new("char1")).char())
.col(ColumnDef::new(Alias::new("char2")).char_len(24))
.col(ColumnDef::new(Alias::new("text_col")).text())
.col(ColumnDef::new(Alias::new("json_col")).json())
.col(ColumnDef::new(Alias::new("uuid_col")).uuid())
.col(ColumnDef::new(Alias::new("decimal1")).decimal())
.col(ColumnDef::new(Alias::new("decimal2")).decimal_len(12, 4))
.col(ColumnDef::new(Alias::new("money1")).money())
.col(ColumnDef::new(Alias::new("money2")).money_len(12, 4))
.col(ColumnDef::new(Alias::new("float_col")).float())
.col(ColumnDef::new(Alias::new("double_col")).double())
.col(ColumnDef::new(Alias::new("date_col")).date())
.col(ColumnDef::new(Alias::new("time_col")).time())
.col(ColumnDef::new(Alias::new("datetime_col")).date_time())
.col(ColumnDef::new(Alias::new("boolean_col")).boolean())
.col(ColumnDef::new(Alias::new("binary2")).binary_len(1024))
.col(ColumnDef::new(Alias::new("binary3")).var_binary(1024))
.to_string(SqliteQueryBuilder),
[
r#"CREATE TABLE "strange" ("#,
r#""id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,"#,
r#""int1" integer,"#,
r#""int2" tinyint,"#,
r#""int3" smallint,"#,
r#""int4" bigint,"#,
r#""string1" varchar,"#,
r#""string2" varchar(24),"#,
r#""char1" char,"#,
r#""char2" char(24),"#,
r#""text_col" text,"#,
r#""json_col" json_text,"#,
r#""uuid_col" uuid_text,"#,
r#""decimal1" real,"#,
r#""decimal2" real(12, 4),"#,
r#""money1" real_money,"#,
r#""money2" real_money(12, 4),"#,
r#""float_col" float,"#,
r#""double_col" double,"#,
r#""date_col" date_text,"#,
r#""time_col" time_text,"#,
r#""datetime_col" datetime_text,"#,
r#""boolean_col" boolean,"#,
r#""binary2" blob(1024),"#,
r#""binary3" varbinary_blob(1024)"#,
r#")"#,
]
.join(" ")
);

The full type mapping table is documented here:

ColumnTypeMySQL data typePostgreSQL data typeSQLite data type
Charcharcharchar
Stringvarcharvarcharvarchar
Texttexttexttext
TinyIntegertinyintsmallinttinyint
SmallIntegersmallintsmallintsmallint
Integerintintegerinteger
BigIntegerbigintbigintinteger
TinyUnsignedtinyint unsignedsmallinttinyint
SmallUnsignedsmallint unsignedsmallintsmallint
Unsignedint unsignedintegerinteger
BigUnsignedbigint unsignedbigintinteger
Floatfloatrealfloat
Doubledoubledouble precisiondouble
Decimaldecimaldecimalreal
DateTimedatetimetimestamp without time zonedatetime_text
Timestamptimestamptimestamptimestamp_text
TimestampWithTimeZonetimestamptimestamp with time zonetimestamp_with_timezone_text
Timetimetimetime_text
Datedatedatedate_text
YearyearN/AN/A
IntervalN/AintervalN/A
Binarybinarybyteablob
VarBinaryvarbinarybyteavarbinary_blob
BitbitbitN/A
VarBitbitvarbitN/A
Booleanboolboolboolean
Moneydecimalmoneyreal_money
Jsonjsonjsonjson_text
JsonBinaryjsonjsonbjsonb_text
Uuidbinary(16)uuiduuid_text
EnumENUM(...)ENUM_NAMEenum_text
ArrayN/ADATA_TYPE[]N/A
CidrN/AcidrN/A
InetN/AinetN/A
MacAddrN/AmacaddrN/A
LTreeN/AltreeN/A

Enhancementsโ€‹

  • #2137 DerivePartialModel macro attribute entity now supports syn::Type
#[derive(DerivePartialModel)]
#[sea_orm(entity = "<entity::Model as ModelTrait>::Entity")]
struct EntityNameNotAIdent {
#[sea_orm(from_col = "foo2")]
_foo: i32,
#[sea_orm(from_col = "bar2")]
_bar: String,
}
  • #2146 Added RelationDef::from_alias()
assert_eq!(
cake::Entity::find()
.join_as(
JoinType::LeftJoin,
cake_filling::Relation::Cake.def().rev(),
cf.clone()
)
.join(
JoinType::LeftJoin,
cake_filling::Relation::Filling.def().from_alias(cf)
)
.build(DbBackend::MySql)
.to_string(),
[
"SELECT `cake`.`id`, `cake`.`name` FROM `cake`",
"LEFT JOIN `cake_filling` AS `cf` ON `cake`.`id` = `cf`.`cake_id`",
"LEFT JOIN `filling` ON `cf`.`filling_id` = `filling`.`id`",
]
.join(" ")
);
  • #1665 [sea-orm-macro] Qualify traits in DeriveActiveModel macro
  • #2064 [sea-orm-cli] Fix migrate generate on empty mod.rs files

Breaking Changesโ€‹

  • #2145 Renamed ConnectOptions::pool_options() to ConnectOptions::sqlx_pool_options()
  • #2145 Made sqlx_common private, hiding sqlx_error_to_xxx_err
  • MySQL money type maps to decimal
  • MySQL blob types moved to extension::mysql::MySqlType; ColumnDef::blob() now takes no parameters
assert_eq!(
Table::create()
.table(BinaryType::Table)
.col(ColumnDef::new(BinaryType::BinaryLen).binary_len(32))
.col(ColumnDef::new(BinaryType::Binary).binary())
.col(ColumnDef::new(BinaryType::Blob).custom(MySqlType::Blob))
.col(ColumnDef::new(BinaryType::TinyBlob).custom(MySqlType::TinyBlob))
.col(ColumnDef::new(BinaryType::MediumBlob).custom(MySqlType::MediumBlob))
.col(ColumnDef::new(BinaryType::LongBlob).custom(MySqlType::LongBlob))
.to_string(MysqlQueryBuilder),
[
"CREATE TABLE `binary_type` (",
"`binlen` binary(32),",
"`bin` binary(1),",
"`b` blob,",
"`tb` tinyblob,",
"`mb` mediumblob,",
"`lb` longblob",
")",
]
.join(" ")
);
  • ColumnDef::binary() sets column type as binary with default length of 1
  • Removed BlobSize enum
  • Added StringLen to represent length of varchar / varbinary
/// Length for var-char/binary; default to 255
pub enum StringLen {
/// String size
N(u32),
Max,
#[default]
None,
}
  • ValueType::columntype() of Vec<u8> maps to VarBinary(StringLen::None)
  • ValueType::columntype() of String maps to String(StringLen::None)
  • ColumnType::Bit maps to bit for Postgres
  • ColumnType::Binary and ColumnType::VarBinary map to bytea for Postgres
  • Value::Decimal and Value::BigDecimal map to real for SQLite
  • ColumnType::Year(Option<MySqlYear>) changed to ColumnType::Year

Upgradesโ€‹

  • Upgrade sea-query to 0.31.0-rc.3
  • Upgrade sea-schema to 0.15.0-rc.4
  • Upgrade sea-query-binder to 0.6.0-rc.1
  • #2088 Upgrade strum to 0.26

House Keepingโ€‹

  • #2140 Improved Actix example to return 404 not found on unexpected inputs
  • #2154 Deprecated Actix v3 example
  • #2136 Re-enabled rocket_okapi example

Release Planningโ€‹

In the previous release of SeaORM, we stated that we want our next release to be 1.0. We are indeed very close to 1.0 now!

While 0.12 will still be maintained before 1.0 get finalized, you are welcome to try out 1.0-rc.x today! There will still be a few minor but still technically breaking changes:

  1. #2185 Adding trait const ARITY to PrimaryKeyTrait, allowing users to write better generic code
  2. #2186 Associating ActiveModel to EntityTrait, allowing users to extend the behaviour of Entities

Now is also the perfect time for you to propose breaking changes that'd have long term impact to SeaORM. After the stablization, we hope that SeaORM can offer a stable API surface that developers can use in production for the years to come.

We'd not have more than 2 major releases in a year, and each major release will be maintained for at least 1 year. It's still tentative, but that's what we have in mind for now. Moreoever, it will actually allow us to ship new features more frequently!

SQL Server Supportโ€‹

We've been planning SQL Server for SeaORM for a while, but it was put aside in 2023 (which I regretted). Anyway SQL Server support is coming soon! It will first be offered as a closed beta to our partners. If you are interested, please join our waiting list.

If you feel generous, a small donation will be greatly appreciated, and goes a long way towards sustaining the organization.

A big shout out to our sponsors ๐Ÿ˜‡:

Gold Sponsorsโ€‹

GitHub Sponsorsโ€‹

Afonso Barracha
Shane Sveller
Dean Sheather
Marcus Buffett
Renรฉ Klaฤan
Apinan I.
Kentaro Tanaka
Natsuki Ikeguchi
Marlon Mueller-Soppart
ul
MasakiMiyazaki
Manfred Lee
KallyDev
Daniel Gallups
Caido
Coolpany SE

Rustacean Sticker Pack ๐Ÿฆ€โ€‹

The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!

Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.

Sticker Pack Contents:

  • Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
  • Mascot of SeaQL: Terres the Hermit Crab
  • Mascot of Rust: Ferris the Crab
  • The Rustacean word

Support SeaQL and get a Sticker Pack!

Rustacean Sticker Pack by SeaQL

ยท 7 min read
SeaQL Team
SeaORM 0.12 Banner

It had been a while since the initial SeaORM 0.12 release. This blog post summarizes the new features and enhancements introduced in SeaORM 0.12.2 through 0.12.12!

Celebrating 2M downloads on crates.io ๐Ÿ“ฆโ€‹

We've just reached the milestone of 2,000,000 all time downloads on crates.io. It's a testament to SeaORM's adoption in professional use. Thank you to all our users for your trust and for being a part of our community.

New Featuresโ€‹

Entity format updateโ€‹

  • #1898 Add support for root JSON arrays (requires the json-array / postgres-array feature)! It involved an intricate type system refactor to work around the orphan rule.
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "json_struct_vec")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(column_type = "Json")]
pub struct_vec: Vec<JsonColumn>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, FromJsonQueryResult)]
pub struct JsonColumn {
pub value: String,
}
  • #2009 Added comment attribute for Entity; create_table_from_entity now supports comment on MySQL
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "applog", comment = "app logs")]
pub struct Model {
#[sea_orm(primary_key, comment = "ID")]
pub id: i32,
#[sea_orm(comment = "action")]
pub action: String,
pub json: Json,
pub created_at: DateTimeWithTimeZone,
}

Cursor paginator improvementsโ€‹

  • #2037 Added descending order to Cursor:
// (default behaviour) Before 5 ASC, i.e. id < 5

let mut cursor = Entity::find().cursor_by(Column::Id);
cursor.before(5);

assert_eq!(
cursor.first(4).all(db).await?,
[
Model { id: 1 },
Model { id: 2 },
Model { id: 3 },
Model { id: 4 },
]
);

// (new API) After 5 DESC, i.e. id < 5

let mut cursor = Entity::find().cursor_by(Column::Id);
cursor.after(5).desc();

assert_eq!(
cursor.first(4).all(db).await?,
[
Model { id: 4 },
Model { id: 3 },
Model { id: 2 },
Model { id: 1 },
]
);
  • #1826 Added cursor support to SelectTwo:
// Join with linked relation; cursor by first table's id

cake::Entity::find()
.find_also_linked(entity_linked::CakeToFillingVendor)
.cursor_by(cake::Column::Id)
.before(10)
.first(2)
.all(&db)
.await?

// Join with relation; cursor by the 2nd table's id

cake::Entity::find()
.find_also_related(Fruit)
.cursor_by_other(fruit::Column::Id)
.before(10)
.first(2)
.all(&db)
.await?

Added "proxy" to database backendโ€‹

#1881, #2000 Added "proxy" to database backend (requires feature flag proxy).

It enables the possibility of using SeaORM on edge / client-side! See the GlueSQL demo for an example.

Enhancementsโ€‹

  • #1954 [sea-orm-macro] Added #[sea_orm(skip)] to FromQueryResult derive macro
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, FromQueryResult)]
pub struct PublicUser {
pub id: i64,
pub name: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
#[sea_orm(skip)]
pub something: Something,
}
  • #1598 [sea-orm-macro] Added support for Postgres arrays in FromQueryResult impl of JsonValue
// existing API:

assert_eq!(
Entity::find_by_id(1).one(db).await?,
Some(Model {
id: 1,
name: "Collection 1".into(),
integers: vec![1, 2, 3],
teas: vec![Tea::BreakfastTea],
colors: vec![Color::Black],
})
);

// new API:

assert_eq!(
Entity::find_by_id(1).into_json().one(db).await?,
Some(json!({
"id": 1,
"name": "Collection 1",
"integers": [1, 2, 3],
"teas": ["BreakfastTea"],
"colors": [0],
}))
);
  • #1828 [sea-orm-migration] Check if an index exists
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// ...

// Make sure the index haven't been created
assert!(!manager.has_index("cake", "cake_name_index").await?);

manager
.create_index(
Index::create()
.name("cake_name_index")
.table(Cake::Table)
.col(Cake::Name)
.to_owned(),
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// ...
}
}
  • #2030 Improve query performance of Paginator's COUNT query
  • #2055 Added SQLx slow statements logging to ConnectOptions
  • #1867 Added QuerySelect::lock_with_behavior
  • #2002 Cast enums in is_in and is_not_in
  • #1999 Add source annotations to errors
  • #1960 Implement StatementBuilder for sea_query::WithQuery
  • #1979 Added method expr_as_ that accepts self
  • #1868 Loader: use ValueTuple as hash key
  • #1934 [sea-orm-cli] Added --enum-extra-derives
  • #1952 [sea-orm-cli] Added --enum-extra-attributes
  • #1693 [sea-orm-cli] Support generation of related entity with composite foreign key

Bug fixesโ€‹

  • #1855, #2054 [sea-orm-macro] Qualify types in DeriveValueType macro
  • #1953 [sea-orm-cli] Fix duplicated active enum use statements on generated entities
  • #1821 [sea-orm-cli] Fix entity generation for non-alphanumeric enum variants
  • #2071 [sea-orm-cli] Fix entity generation for relations with composite keys
  • #1800 Fixed find_with_related consolidation logic
  • 5a6acd67 Fixed Loader panic on empty inputs

Upgradesโ€‹

  • #1984 Upgraded axum example to 0.7
  • #1858 Upgraded chrono to 0.4.30
  • #1959 Upgraded rocket to 0.5.0
  • Upgraded sea-query to 0.30.5
  • Upgraded sea-schema to 0.14.2
  • Upgraded salvo to 0.50

House Keepingโ€‹

  • #2057 Fix clippy warnings on 1.75
  • #1811 Added test cases for find_xxx_related/linked

Release planningโ€‹

In the announcement blog post of SeaORM 0.12, we stated we want to reduce the frequency of breaking releases while maintaining the pace for feature updates and enhancements. I am glad to say we've accomplished that!

There are still a few breaking changes planned for the next major release. After some discussions and consideration, we decided that the next major release will be a release candidate for 1.0!

If you feel generous, a small donation will be greatly appreciated, and goes a long way towards sustaining the organization.

A big shout out to our sponsors ๐Ÿ˜‡:

Gold Sponsorsโ€‹

GitHub Sponsorsโ€‹

ร‰mile Fugulin
Afonso Barracha
Shane Sveller
Dean Sheather
Marcus Buffett
Renรฉ Klaฤan
IceApinan
Jacob Trueb
Kentaro Tanaka
Natsuki Ikeguchi
Marlon Mueller-Soppart
ul
Manfred Lee
KallyDev
Daniel Gallups
Coolpany-SE

Rustacean Sticker Pack ๐Ÿฆ€โ€‹

The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!

Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.

Sticker Pack Contents:

  • Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
  • Mascot of SeaQL: Terres the Hermit Crab
  • Mascot of Rust: Ferris the Crab
  • The Rustacean word

Support SeaQL and get a Sticker Pack!

Rustacean Sticker Pack by SeaQL

ยท 10 min read
Billy Chan

524 members of the SeaQL community from 41 countries kindly contributed their thoughts on using SeaQL libraries, learning Rust and employing Rust in their day to day development lives. From these responses we hope to get an understanding of where the SeaQL and Rust community stands in 2023.

This is our first community survey, we will conduct the survey annually to keep track of how the community evolves over time.

Demographicsโ€‹

Q. Where are you located in?โ€‹

Participants are from 41 countries across the world!

Other: ArgentinaAustraliaAustriaBelarusBelgiumCyprusCzechiaDenmarkHungaryIranIrelandItalyJapanKazakstanKoreaMongoliaNigeriaNorwayPeruPolandSlovakiaSouth AfricaSpainSwedenTaiwan ThailandTurkeyUkraine

Use of SeaQL Librariesโ€‹

Q. Are you using SeaQL libraries in building a project?โ€‹

Q. Which SeaQL libraries are you using in building a project?โ€‹

Other: SeaographySeaStreamer

Q. Are you using SeaQL libraries in a personal, academic or professional context?โ€‹

Q. Why did you choose SeaQL libraries?โ€‹

Other: Async support, future proof and good documentationGood Query PerformanceIt was recommended on websites and YouTubeDoes not use SQL for migrationsBeginner-friendly and easy to get startedEasy to translate from Eloquent ORM knowledgeCan drop in to SeaQuery if necessaryI started with SQLx, then tried SeaQueryI found good examples on YouTube

Q. What qualities of SeaQL libraries do you think are important?โ€‹

Other: Simple SyntaxBeing able to easily express what you would otherwise be able to write in pure SQLMigration and entity generationClarify of the implementation and usage patternsEfficient query building especially with relations and joinsErgonomic API

Team & Project Natureโ€‹

Q. How many team members (including you) are working on the project?โ€‹

Q. Can you categorize the nature of the project?โ€‹

Other: ForecastingFinancial tradingEnterprise Resource Planning (ERP)FintechCloud infrstructure automationBackend for desktop, mobile and web application

Tech Stackโ€‹

Q. What is your development environment?โ€‹

Linux Breakdownโ€‹

Windows Breakdownโ€‹

macOS Breakdownโ€‹

Q. Which database(s) do you use?โ€‹

Q. Which web framework are you using?โ€‹

Q. What is the deployment environment?โ€‹

Rust at Workโ€‹

Q. Are you using Rust at work?โ€‹

Q. Which industry your company is in?โ€‹

Vague description of the companyโ€‹

A banking companyA business to business lending platformA cloud StorageA consulting companyA cybersecurity management platformAn IT solution companyAn E-Commerce clothing storeA children entertainmets companyA factory construction management platformA fintech startupA geology technology companyA publicly traded health-tech companyA private restaurant chainAn industrial IoT for heating and water distributionsAn internet providerA nonprofit tech research organizationA payment service providerA road intelligence companyA SaaS startupA server hosting providerA DevOps platform that helps our users scale their Kubernetes applicationAn Automotive company

Q. What is the size of your company?โ€‹

Q. How many engineers in your company are dedicated to writing Rust?โ€‹

Q. Which layer(s) of the technology stack are using Rust?โ€‹

Learning Rustโ€‹

Q. Are you learning / new to Rust?โ€‹

Q. Which language(s) are you most familiar with?โ€‹

Q. Are you familiar with SQL?โ€‹

Q. Do you find Rust easy or hard to learn?โ€‹

Q. What motivates you to learn Rust?โ€‹

Other: Ability to develop fast, secure and standalone API driven toolsEfficiency, safety, low resource usageGood design decisions from the startReliability and ease of developmentSchool makes me to learnRust is too coolThe ecosystem of libraries + general competence of lib authorsIt is the most loved languageThe guarantees Rust providesLearning something newType safety and speedWant to get away from NULLNo boilerplate, if you do not want itPerformance

Q. What learning resources do you rely on?โ€‹

Other: YouTubeOnline CoursesChatGPT

Q. What is your first project built using Rust?โ€‹

Other: ChatbotScraperRasterization of the mandelbrot setIoTLibrary

What's Nextโ€‹

Q. Which aspects do you want to see advancement on SeaORM?โ€‹

Thank you for all the suggestions, we will certainly take them into account!

Other: Full MySQL coverageMS SQL Server supportStructured queries for complex joinsA stable releaseData seedingMigrations based on Entity diffsType safetySupport tables without primary keyTurso integrationFetching nested structuresViews

Q. What tools would you be interested in using, if developed first-party by SeaQL?โ€‹

Other: An API integration testing utilityAn oso-based authorization integrationA visual tool for managing migrationsDatabase layout editor (like dbdiagram.io)

Share Your Thoughtsโ€‹

Q. Anything else you want to say?โ€‹

Didn't expect this section to turn into a testimonial, thank you for all the kind words :)

Good job yall

Great projects, thanks for your hard work

I expect it to be an asynchronous type-safe library. Keep up the good work!

I'd like to see entity generation without a database

The website, support from JetBrains, the documentation and the release cycle are very nice!

I'm very interested in how SeaORM will continue evolving and I would like to wish you the best of luck!

I've found SeaORM very useful and I'm very grateful to the development team for creating and maintaining it!

In TypeORM I can write entities and then generate migration from them. It's very handy. It helps to increase development speed. It would be nice to have this functionality in SeaORM.

It needs to have better integration with SeaQuery, I sometimes need to get to it because not all features are available in SeaORM which makes it a pain.

Keep the good work!

Keep going! Love SeaORM!

Keep up the great work. Rust needs a fast, ergonomic and reliable ORM.

SeaORM is very powerful, but the rust docs and tutorial examples could be more fleshed out.

SeaORM is an awesome library. Most things are quite concise and therefore straightforward. Simply a few edge cases concerning DB specific types and values could be better.

The trait system is too complex and coding experience is not pretty well with that.

Automatic migration generation would make the library pretty much perfect in my opinion.

SeaQL tutorials could be better. Much more detailed explanation and definitely it has to have best practices section for Design Patterns like and good best practices related with clean architecture.

SeaQL are great products and itโ€™s very enjoyable using them

Thank you <3

Thank you for awesome library!

Thank you for this wonderful project. I feel the documentation lacks examples for small functions and usage of some obscure features.

Thank you for your hard work!

Thank you for your work on SeaQL, your efforts are appreciated

Thank you for your work, we are seeking actively to include SeaORM in our projects

Thank you very much for your work!

Thanks a lot for the amazing work you guys put into this set of libraries. This is an amazing development for the rust ecosystem.

Thanks and keep up the good work.

Thanks for a great tool!

Thanks for all the amazing work.

Thanks for making SeaORM!

The project I am doing for work is only a prototype, it's a new implementation of a current Python forecasting project which uses a pandas and a custom psycopg2 orm. My intent is to create a faster/dev friendly version with SeaORM and Polars. I am hoping to eventually get a prototype I can display to my team to get a go ahead to fully develop a new version, and to migrate 4-5 other forecasting apps using shared libraries for io and calculations.

I have also been using SeaORM for a small API client for financial data, which I may make open source.

I think one thing which could really improve SeaORM is some more advance examples in the documentation section. The docs are really detailed as far as rust documentation goes.

Very promising project, keep it up.

Thank you so much for taking it upon yourselves to selflessly give your free time. It probably doesn't matter much, but thank you so much for your work. SeaORM is a fantastic tool that I can see myself using for a long time to come. I hope to make contributions in any form when I am under better circumstances :3 Kudos to the team!

ไฝ ไปฌ็š„ๅบ“้žๅธธ็š„ๆฃ’๏ผŒ่‡ณๅฐ‘ๆˆ‘่ง‰ๅพ—ๆฏ”Dieselๅฅฝๅคชๅคšไบ†๏ผŒๅ…ฅ้—จ็ฎ€ๅ•๏ผŒๅฏนๆ–ฐๆ‰‹้žๅธธๅ‹ๅฅฝ๏ผŒ่ฟ™ๆ˜ฏๆœ€ๅคง็š„ไบฎ็‚น๏ผŒๅ…ถๆฌกๆ˜ฏๅบ“่ฒŒไผผๅฏไปฅๅฎž็Žฐๅพˆๅคๆ‚็š„Join SQL้€ป่พ‘่€Œไธ็”จๅ†™ๅŽŸ็”Ÿ็š„SQL๏ผŒ่ฟ™็‚นไนŸๆ˜ฏ้žๅธธๅ€ผๅพ—็‚น่ตž็š„๏ผŒไฝ†ๆ˜ฏๅœจ่ฟ™ๅ—็š„ๆ–‡ๆกฃ่ฒŒไผผๅ†™็š„ๆœ‰็‚น็ฎ€็•ฅไบ†๏ผŒๅธŒๆœ›ๅฏไปฅไธฐๅฏŒไธ€ไธ‹ๆ–‡ๆกฃๅ†…ๅฎน๏ผŒๅฏนไบŽๅคๆ‚ๆŸฅ่ฏข็š„่ฏดๆ˜Žๅฏไปฅๆ›ดๅŠ ่ฏฆ็ป†ไธ€ไบ›๏ผŒ่ฟ™ๆ ทๅฐฑๅ†ๅฅฝไธ่ฟ‡ไบ†ใ€‚่ฐข่ฐขไฝ ไปฌ๏ผŒๆˆ‘ไผšๆŒ็ปญๅ…ณๆณจไฝ ไปฌ๏ผŒๆœชๆฅ็š„้กน็›ฎๅฆ‚ๆžœๆถ‰ๅŠORM๏ผŒ้‚ฃ็ปๅฏน้žไฝ ไปฌ่Žซๅฑžไบ†๏ผ

Rustacean Sticker Pack ๐Ÿฆ€โ€‹

The Rustacean Sticker Pack is the perfect way to express your passion for Rust. Our stickers are made with a premium water-resistant vinyl with a unique matte finish. Stick them on your laptop, notebook, or any gadget to show off your love for Rust!

Moreover, all proceeds contributes directly to the ongoing development of SeaQL projects.

Sticker Pack Contents:

  • Logo of SeaQL projects: SeaQL, SeaORM, SeaQuery, Seaography, FireDBG
  • Mascot of SeaQL: Terres the Hermit Crab
  • Mascot of Rust: Ferris the Crab
  • The Rustacean word

Support SeaQL and get a Sticker Pack!

Rustacean Sticker Pack by SeaQL

ยท One min read
SeaQL Team

It is our honour to have been awarded by OpenUK for the 2023 Award in the Software category! The award ceremony was a very memorable experience. A huge thanks to Red Badger who sponsored the software award.

In 2023, we released SeaStreamer, two major versions of SeaORM, a new version of Seaography, and have been busy working on a new project on the side.

We reached the milestone of 5k GitHub stars and 2M crates.io downloads mid-year.

In the summer, we took in two interns for our 3rd summer of code.

We plan to offer internships tailored to UK students in 2024 through university internship programs. As always, we welcome contributors from all over the world, and may be we will enrol on GSoC 2024 again. (but open-source is not bounded any schedule, so you can start contributing anytime)

A big thanks to our sponsors who continued to support us, and we look forward to a more impactful 2024.

ยท 4 min read
Chris Tsang

If you are writing an async application in Rust, at some point you'd want to separate the code into several crates. There are some benefits:

  1. Better encapsulation. Having a crate boundary between sub-systems can lead to cleaner code and a more well-defined API. No more pub(crate)!
  2. Faster compilation. By breaking down a big crate into several independent small crates, they can be compiled concurrently.

But the question is, if you are using only one async runtime anyway, what are the benefits of writing async-runtime-generic libraries?

  1. Portability. You can easily switch to a different async runtime, or wasm.
  2. Correctness. Testing a library against both tokio and async-std can uncover more bugs, including concurrency bugs (due to fuzzy task execution orders) and "undefined behaviour" either due to misunderstanding or async-runtime implementation details

So now you've decided to write async-runtime-generic libraries! Here I want to share 3 strategies along with examples found in the Rust ecosystem.

Approach 1: Defining your own AsyncRuntime traitโ€‹

Using the futures crate you can write very generic library code, but there is one missing piece: time - to sleep or timeout, you have to rely on an async runtime. If that's all you need, you can define your own AsyncRuntime trait and requires downstream to implement it. This is the approach used by rdkafka:

pub trait AsyncRuntime: Send + Sync + 'static {
type Delay: Future<Output = ()> + Send;

/// It basically means the return value must be a `Future`
fn sleep(duration: Duration) -> Self::Delay;
}

Here is how it's implemented:

impl AsyncRuntime for TokioRuntime {
type Delay = tokio::time::Sleep;

fn sleep(duration: Duration) -> Self::Delay {
tokio::time::sleep(duration)
}
}

Library code to use the above:

async fn operation<R: AsyncRuntime>() {
R::sleep(Duration::from_millis(1)).await;
}

Approach 2: Abstract the async runtimes internally and expose feature flagsโ€‹

This is the approach used by redis-rs.

To work with network connections or file handle, you can use the AsyncRead / AsyncWrite traits:

#[async_trait]
pub(crate) trait AsyncRuntime: Send + Sync + 'static {
type Connection: AsyncRead + AsyncWrite + Send + Sync + 'static;

async fn connect(addr: SocketAddr) -> std::io::Result<Self::Connection>;
}

Then you'll define a module for each async runtime:

#[cfg(feature = "runtime-async-std")]
mod async_std_impl;
#[cfg(feature = "runtime-async-std")]
use async_std_impl::*;

#[cfg(feature = "runtime-tokio")]
mod tokio_impl;
#[cfg(feature = "runtime-tokio")]
use tokio_impl::*;

Where each module would look like:

tokio_impl.rs
#[async_trait]
impl AsyncRuntime for TokioRuntime {
type Connection = tokio::net::TcpStream;

async fn connect(addr: SocketAddr) -> std::io::Result<Self::Connection> {
tokio::net::TcpStream::connect(addr).await
}
}

Library code to use the above:

async fn operation<R: AsyncRuntime>(conn: R::Connection) {
conn.write(b"some bytes").await;
}

Approach 3: Maintain an async runtime abstraction crateโ€‹

This is the approach used by SQLx and SeaStreamer.

Basically, aggregate all async runtime APIs you'd use and write a wrapper library. This may be tedious, but this also has the benefit of specifying all interactions with the async runtime in one place for your project, which could be handy for debugging or tracing.

For example, async Task handling:

common-async-runtime/tokio_task.rs
pub use tokio::task::{JoinHandle as TaskHandle};

pub fn spawn_task<F, T>(future: F) -> TaskHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn(future)
}

async-std's task API is slightly different (in tokio the output is Result<T, JoinError>), which requires some boilerplate:

common-async-runtime/async_std_task.rs
/// A shim to match tokio's API
pub struct TaskHandle<T>(async_std::task::JoinHandle<T>);

pub fn spawn_task<F, T>(future: F) -> TaskHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
TaskHandle(async_std::task::spawn(future))
}

#[derive(Debug)]
pub struct JoinError;

impl std::error::Error for JoinError {}

// This is basically how you wrap a `Future`
impl<T> Future for TaskHandle<T> {
type Output = Result<T, JoinError>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.0.poll_unpin(cx) {
std::task::Poll::Ready(res) => std::task::Poll::Ready(Ok(res)),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}

In the library's Cargo.toml, you can simply include common-async-runtime as dependency. This makes your library code 'pure', because now selecting an async runtime is controlled by downstream. Similar to approach 1, this crate can be compiled without any async runtime, which is neat!

Conclusionโ€‹

Happy hacking! Welcome to share your experience with the community.

ยท 5 min read
Chris Tsang

๐ŸŽ‰ We are pleased to release SeaStreamer 0.3.x!

File Backendโ€‹

A major addition in SeaStreamer 0.3 is the file backend. It implements the same high-level MPMC API, enabling streaming to and from files. There are different use cases. For example, it can be used to dump data from Redis / Kafka and process them locally, or as an intermediate file format for storage or transport.

The SeaStreamer File format, .ss is pretty simple. It's very much like .ndjson, but binary. The file format is designed with the following goals:

  1. Binary data support without encoding overheads
  2. Efficiency in rewinding / seeking through a large dump
  3. Streaming-friendliness - File can be truncated without losing integrity

Let me explain in details.

First of all, SeaStreamer File is a container format. It only concerns the message stream and framing, not the payload. It's designed to be paired with a binary message format like Protobuf or BSON.

Encode-freeโ€‹

JSON and CSV are great plain text file formats, but they are not binary friendly. Usually, to encode binary data, one would use base64. It therefore imposes an expensive encoding / decoding overhead. In a binary protocol, delimiters are frequently used to signal message boundaries. As a consequence, byte stuffing is needed to escape the bytes.

In SeaStreamer, we want to avoid the encoding overhead entirely. The payload should be written to disk verbatim. So the file format revolves around constructing message frames and placing checksums to ensure that data is interpreted correctly.

Efficient seekโ€‹

A delimiter-based protocol has an advantage: the byte stream can be randomly sought, and we always have no trouble reading the next message.

Since SeaStreamer does not rely on delimiters, we can't easily align to message frames after a random seek. We solve this problem by placing beacons in a regular interval at fixed locations throughout the file. E.g. say the beacon interval is 1024, there will be a beacon at the 1024th byte, the 2048th, and so on. Then, every time we want to seek to a random location, we'd seek to the closest N * 1024 byte and read from there.

These beacons also double as indices: they contain summaries of the individual streams. So given a particular stream key and sequence number (or timestamp) to search for, SeaStreamer can quickly locate the message just by reading the beacons. It doesn't matter if the stream's messages are sparse!

Streaming-friendlinessโ€‹

It should always be safe to truncate files. It should be relatively easy to split a file into chunks. We should be able to tell if the data is corrupted.

SeaStreamer achieves this by computing a checksum for every message, and also the running checksum of the checksums for each stream. It's not enforced right now, but in theory we can detect if any messages are missing from a stream.

Summaryโ€‹

This file format is also easy to implement in different languages, as we just made an (experimental) reader in Typescript.

That's it! If you are interested, you can go and take a look at the format description.

Redis Backendโ€‹

Redis Streams are underrated! They have high throughput and concurrency, and are best suited for non-persistent stream processing near or on the same host as the application.

The obstacle is probably in library support. Redis Streams' API is rather low level, and there aren't many high-level libraries to help with programming, as opposed to Kafka, which has versatile official programming libraries.

The pitfall is, it's not easy to maximize concurrency with the raw Redis API. To start, you'd need to pipeline XADD commands. You'd also need to time and batch XACKs so that it does not block reads and computation. And of course you want to separate the reads and writes on different threads.

SeaStreamer breaks these obstacles for you and offers a Kafka-like API experience!

Benchmarkโ€‹

In 0.3, we have done some optimizations to improve the throughput of the Redis and File backend. We set our initial benchmark at 100k messages per second, which hopefully we can further improve over time.

Our micro benchmark involves a simple program producing or consuming 100k messages, where each message has a payload of 256 bytes.

For Redis, it's running on the same computer in Docker. On my not-very-impressive laptop with a 10th Gen Intel Core i7, the numbers are somewhat around:

Producerโ€‹

redis    0.5s
stdio 0.5s
file 0.5s

Consumerโ€‹

redis    1.0s
stdio 1.0s
file 1.1s

It practically means that we are comfortably in the realm of producing 100k messages per second, but are just about able to consume 100k messages in 1 second. Suggestions to performance improvements are welcome!

Communityโ€‹

SeaQL.org is an independent open-source organization run by passionate ๏ธdevelopers. If you like our projects, please star โญ and share our repositories. If you feel generous, a small donation via GitHub Sponsor will be greatly appreciated, and goes a long way towards sustaining the organization ๐Ÿšข.

SeaStreamer is a community driven project. We welcome you to participate, contribute and together build for Rust's future ๐Ÿฆ€.

ยท 8 min read
SeaQL Team
SeaORM 0.12 Banner

๐ŸŽ‰ We are pleased to announce SeaORM 0.12 today!

We still remember the time when we first introduced SeaORM to the Rust community two years ago. We set out a goal to enable developers to build asynchronous database-driven applications in Rust.

Today, many open-source projects, a handful of startups and many more closed-source projects are using SeaORM. Thank you all who participated and contributed in the making!

SeaORM Star History

New Features ๐ŸŒŸโ€‹

๐Ÿงญ Seaography: GraphQL integration (preview)โ€‹

Seaography example

Seaography is a GraphQL framework built on top of SeaORM. In 0.12, Seaography integration is built into sea-orm. Seaography allows you to build GraphQL resolvers quickly. With just a few commands, you can launch a GraphQL server from SeaORM entities!

While Seaography development is still in an early stage, it is especially useful in prototyping and building internal-use admin panels.

Read the documentation to learn more.

Added macro DerivePartialModelโ€‹

#1597 Now you can easily perform custom select to query only the columns you needed

#[derive(DerivePartialModel, FromQueryResult)]
#[sea_orm(entity = "Cake")]
struct PartialCake {
name: String,
#[sea_orm(
from_expr = r#"SimpleExpr::FunctionCall(Func::upper(Expr::col((Cake, cake::Column::Name))))"#
)]
name_upper: String,
}

assert_eq!(
cake::Entity::find()
.into_partial_model::<PartialCake>()
.into_statement(DbBackend::Sqlite)
.to_string(),
r#"SELECT "cake"."name", UPPER("cake"."name") AS "name_upper" FROM "cake""#
);

Added Select::find_with_linkedโ€‹

#1728, #1743 Similar to find_with_related, you can now select related entities and consolidate the models.

// Consider the following link
pub struct BakedForCustomer;

impl Linked for BakedForCustomer {
type FromEntity = Entity;

type ToEntity = super::customer::Entity;

fn link(&self) -> Vec<RelationDef> {
vec![
super::cakes_bakers::Relation::Baker.def().rev(),
super::cakes_bakers::Relation::Cake.def(),
super::lineitem::Relation::Cake.def().rev(),
super::lineitem::Relation::Order.def(),
super::order::Relation::Customer.def(),
]
}
}

let res: Vec<(baker::Model, Vec<customer::Model>)> = Baker::find()
.find_with_linked(baker::BakedForCustomer)
.order_by_asc(baker::Column::Id)
.all(db)
.await?

Added DeriveValueType derive macro for custom wrapper typesโ€‹

#1720 So now you can use newtypes easily.

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "custom_value_type")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub number: Integer,
// Postgres only
pub str_vec: StringVec,
}

#[derive(Clone, Debug, PartialEq, Eq, DeriveValueType)]
pub struct Integer(i32);

#[derive(Clone, Debug, PartialEq, Eq, DeriveValueType)]
pub struct StringVec(pub Vec<String>);

Which saves you the boilerplate of:

impl std::convert::From<StringVec> for Value { .. }

impl TryGetable for StringVec {
fn try_get_by<I: ColIdx>(res: &QueryResult, idx: I)
-> Result<Self, TryGetError> { .. }
}

impl ValueType for StringVec {
fn try_from(v: Value) -> Result<Self, ValueTypeErr> { .. }

fn type_name() -> String { "StringVec".to_owned() }

fn array_type() -> ArrayType { ArrayType::String }

fn column_type() -> ColumnType { ColumnType::String(None) }
}

Enhancements ๐Ÿ†™โ€‹

#1433 Chained AND / OR join ON conditionโ€‹

Added more macro attributes to DeriveRelation

// Entity file

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
// By default, it's `JOIN `fruit` ON `cake`.`id` = `fruit`.`cake_id` AND `fruit`.`name` LIKE '%tropical%'`
#[sea_orm(
has_many = "super::fruit::Entity",
on_condition = r#"super::fruit::Column::Name.like("%tropical%")"#
)]
TropicalFruit,
// Specify `condition_type = "any"` to override it, now it becomes
// `JOIN `fruit` ON `cake`.`id` = `fruit`.`cake_id` OR `fruit`.`name` LIKE '%tropical%'`
#[sea_orm(
has_many = "super::fruit::Entity",
on_condition = r#"super::fruit::Column::Name.like("%tropical%")"#
condition_type = "any",
)]
OrTropicalFruit,
}

#1508 Supports entity with composite primary key of arity 12โ€‹

#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "primary_key_of_12")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id_1: String,
...
#[sea_orm(primary_key, auto_increment = false)]
pub id_12: bool,
}

#1677 Added UpdateMany::exec_with_returning()โ€‹

let models: Vec<Model> = Entity::update_many()
.col_expr(Column::Values, Expr::expr(..))
.exec_with_returning(db)
.await?;

#1511 Added MigratorTrait::migration_table_name() method to configure the name of migration tableโ€‹

#[async_trait::async_trait]
impl MigratorTrait for Migrator {
// Override the name of migration table
fn migration_table_name() -> sea_orm::DynIden {
Alias::new("override_migration_table_name").into_iden()
}
...
}

#1707 Added DbErr::sql_err() method to parse common database errorsโ€‹

assert!(matches!(
cake.into_active_model().insert(db).await
.expect_err("Insert a row with duplicated primary key")
.sql_err(),
Some(SqlErr::UniqueConstraintViolation(_))
));

assert!(matches!(
fk_cake.insert(db).await
.expect_err("Insert a row with invalid foreign key")
.sql_err(),
Some(SqlErr::ForeignKeyConstraintViolation(_))
));

#1737 Introduced new ConnAcquireErrโ€‹

enum DbErr {
ConnectionAcquire(ConnAcquireErr),
..
}

enum ConnAcquireErr {
Timeout,
ConnectionClosed,
}

#1627 Added DatabaseConnection::ping()โ€‹

|db: DatabaseConnection| {
assert!(db.ping().await.is_ok());
db.clone().close().await;
assert!(matches!(db.ping().await, Err(DbErr::ConnectionAcquire)));
}

#1708 Added TryInsert that does not panic on empty insertsโ€‹

// now, you can do:
let res = Bakery::insert_many(std::iter::empty())
.on_empty_do_nothing()
.exec(db)
.await;

assert!(matches!(res, Ok(TryInsertResult::Empty)));

#1712 Insert on conflict do nothing to return Okโ€‹

let on = OnConflict::column(Column::Id).do_nothing().to_owned();

// Existing behaviour
let res = Entity::insert_many([..]).on_conflict(on).exec(db).await;
assert!(matches!(res, Err(DbErr::RecordNotInserted)));

// New API; now you can:
let res =
Entity::insert_many([..]).on_conflict(on).do_nothing().exec(db).await;
assert!(matches!(res, Ok(TryInsertResult::Conflicted)));

#1740, #1755 Replacing sea_query::Iden with sea_orm::DeriveIdenโ€‹

To provide a more consistent interface, sea-query/derive is no longer enabled by sea-orm, as such, Iden no longer works as a derive macro (it's still a trait).

// then:

#[derive(Iden)]
#[iden = "category"]
pub struct CategoryEnum;

#[derive(Iden)]
pub enum Tea {
Table,
#[iden = "AfternoonTea"]
EverydayTea,
}

// now:

#[derive(DeriveIden)]
#[sea_orm(iden = "category")]
pub struct CategoryEnum;

#[derive(DeriveIden)]
pub enum Tea {
Table,
#[sea_orm(iden = "AfternoonTea")]
EverydayTea,
}

New Release Train Ferry ๐Ÿšขโ€‹

It's been the 12th release of SeaORM! Initially, a major version was released every month. It gradually became 2 to 3 months, and now, it's been 6 months since the last major release. As our userbase grew and some are already using SeaORM in production, we understand the importance of having a stable API surface and feature set.

That's why we are committed to:

  1. Reviewing breaking changes with strict scrutiny
  2. Expanding our test suite to cover all features of our library
  3. Never remove features, and consider deprecation carefully

Today, the architecture of SeaORM is pretty solid and stable, and with the 0.12 release where we paid back a lot of technical debt, we will be able to deliver new features and enhancements without breaking. As our major dependency SQLx is not 1.0 yet, technically we cannot be 1.0.

We are still advancing rapidly, and we will always make a new release as soon as SQLx makes a new release, so that you can upgrade everything at once. As a result, the next major release of SeaORM will come out 6 months from now, or when SQLx makes a new release, whichever is earlier.

Community Survey ๐Ÿ“โ€‹

SeaQL is an independent open-source organization. Our goal is to enable developers to build data intensive applications in Rust. If you are using SeaORM, please participate in the SeaQL Community Survey!

By completing this survey, you will help us gather insights into how you, the developer, are using our libraries and identify means to improve your developer experience. We will also publish an annual survey report to summarize our findings.

If you are a happy user of SeaORM, consider writing us a testimonial!

A big thank to DigitalOcean who sponsored our server hosting, and JetBrains who sponsored our IDE, and every sponsor on GitHub Sponsor!

If you feel generous, a small donation will be greatly appreciated, and goes a long way towards sustaining the organization.

A big shout out to our sponsors ๐Ÿ˜‡:

Shane Sveller
ร‰mile Fugulin
Afonso Barracha
Jacob Trueb
Natsuki Ikeguchi
Marlon Mueller-Soppart
KallyDev
Dean Sheather
Manfred Lee
Roland Gorรกcz
IceApinan
Renรฉ Klaฤan
Unnamed Sponsor

What's Next for SeaORM? โ›ตโ€‹

Open-source project is a never-ending work, and we are actively looking for ways to sustain the project. You can support our endeavour by starring & sharing our repositories and becoming a sponsor.

We are considering multiple directions to generate revenue for the organization. If you have any suggestion, or want to join or collaborate with us, please contact us via hello[at]sea-ql.org.

Thank you for your support, and together we can make open-source sustainable.

ยท 5 min read
Chris Tsang

We are pleased to introduce SeaStreamer to the Rust community today. SeaStreamer is a stream processing toolkit to help you build stream processors in Rust.

At SeaQL we want to make Rust the best programming platform for data engineering. Where SeaORM is the essential tool for working with SQL databases, SeaStreamer aims to be your essential toolkit for working with streams.

Currently SeaStreamer provides integration with Kafka and Redis.

Let's have a quick tour of SeaStreamer.

High level async APIโ€‹

  • High level async API that supports both async-std and tokio
  • Mutex-free implementation1: concurrency achieved by message passing
  • A comprehensive type system that guides/restricts you with the API

Below is a basic Kafka consumer:

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let stream: StreamUrl = "kafka://streamer.sea-ql.org:9092/my_stream".parse()?;
let streamer = KafkaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = KafkaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_offset_reset(AutoOffsetReset::Earliest);
let consumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;

loop {
let mess = consumer.next().await?;
println!("{}", mess.message().as_str()?);
}
}

Consumer::stream() returns an object that implements the Stream trait, which allows you to do neat things:

let items = consumer
.stream()
.take(num)
.map(process_message)
.collect::<Vec<_>>()
.await

Trait-based abstract interfaceโ€‹

All SeaStreamer backends implement a common abstract interface, offering you a familiar API. Below is a basic Redis consumer, which is nearly the same as the previous example:

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let stream: StreamUrl = "redis://localhost:6379/my_stream".parse()?;
let streamer = RedisStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = RedisConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(AutoStreamReset::Earliest);
let consumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;

loop {
let mess = consumer.next().await?;
println!("{}", mess.message().as_str()?);
}
}

Redis Streams Supportโ€‹

SeaStreamer Redis provides a Kafka-like stream semantics:

  • Non-group streaming with AutoStreamReset option
  • Consumer-group-based streaming with auto-ack and/or auto-commit
  • Load balancing among consumers with automatic failover
  • Seek/rewind to point in time

You don't have to call XADD, XREAD, XACK, etc... anymore!

Enum-based generic interfaceโ€‹

The trait-based API requires you to designate the concrete Streamer type for monomorphization, otherwise the code cannot compile.

Akin to how SeaORM implements runtime-polymorphism, SeaStreamer provides a enum-based generic streamer, in which the backend is selected on runtime.

Here is an illustration (full example):

// sea-streamer-socket
pub struct SeaConsumer {
backend: SeaConsumerBackend,
}

enum SeaConsumerBackend {
#[cfg(feature = "backend-kafka")]
Kafka(KafkaConsumer),
#[cfg(feature = "backend-redis")]
Redis(RedisConsumer),
#[cfg(feature = "backend-stdio")]
Stdio(StdioConsumer),
}

// Your code
let uri: StreamerUri = "kafka://localhost:9092".parse()?; // or
let uri: StreamerUri = "redis://localhost:6379".parse()?; // or
let uri: StreamerUri = "stdio://".parse()?;

// SeaStreamer will be backed by Kafka, Redis or Stdio depending on the URI
let streamer = SeaStreamer::connect(uri, Default::default()).await?;

// Set backend-specific options
let mut options = SeaConsumerOptions::new(ConsumerMode::Resumable);
options.set_kafka_consumer_options(|options: &mut KafkaConsumerOptions| { .. });
options.set_redis_consumer_options(|options: &mut RedisConsumerOptions| { .. });
let mut consumer: SeaConsumer = streamer.create_consumer(stream_keys, options).await?;

// You can still retrieve the concrete type
let kafka: Option<&mut KafkaConsumer> = consumer.get_kafka();
let redis: Option<&mut RedisConsumer> = consumer.get_redis();

So you can "write once, stream anywhere"!

Good old unix pipeโ€‹

In SeaStreamer, stdin & stdout can be used as stream source and sink.

Say you are developing some processors to transform a stream in several stages:

./processor_1 --input kafka://localhost:9092/input --output kafka://localhost:9092/stage_1 &
./processor_2 --input kafka://localhost:9092/stage_1 --output kafka://localhost:9092/stage_2 &
./processor_3 --input kafka://localhost:9092/stage_2 --output kafka://localhost:9092/output &

It would be great if we can simply pipe the processors together right?

With SeaStreamer, you can do the following:

./processor_1 --input kafka://localhost:9092/input --output stdio:///stream |
./processor_2 --input stdio:///stream --output stdio:///stream |
./processor_3 --input stdio:///stream --output kafka://localhost:9092/output

All without recompiling the stream processors! Now, you can develop locally with the comfort of using |, >, < and your favourite unix program in the shell.

Testableโ€‹

SeaStreamer encourages you to write tests at all levels:

  • You can execute tests involving several stream processors in the same OS process
  • You can execute tests involving several OS processes by connecting them with pipes
  • You can execute tests involving several stream processors with Redis / Kafka

All against the same piece of code! Let SeaStreamer take away the boilerplate and mocking facility from your codebase.

Below is an example of intra-process testing, which can be run with cargo test without any dependency or side-effects:

let stream = StreamKey::new("test")?;
let mut options = StdioConnectOptions::default();
options.set_loopback(true); // messages produced will be feed back to consumers
let streamer = StdioStreamer::connect(StreamerUri::zero(), options).await?;
let producer = streamer.create_producer(stream.clone(), Default::default()).await?;
let mut consumer = streamer.create_consumer(&[stream.clone()], Default::default()).await?;

for i in 0..5 {
let mess = format!("{}", i);
producer.send(mess)?;
}

let seq = collect(&mut consumer, 5).await;
assert_eq!(seq, [0, 1, 2, 3, 4]);

Getting startedโ€‹

If you are eager to get started with SeaStreamer, you can checkout our set of examples:

  • consumer: A basic consumer
  • producer: A basic producer
  • processor: A basic stream processor
  • resumable: A resumable stream processor that continues from where it left off
  • buffered: An advanced stream processor with internal buffering and batch processing
  • blocking: An advanced stream processor for handling blocking / CPU-bound tasks

Read the official documentation to learn more.

Roadmapโ€‹

A few major components we plan to develop:

  • File Backend
  • Redis Cluster

We welcome you to join our Discussions if you have thoughts or ideas!

Peopleโ€‹

SeaStreamer is designed and developed by the same mind who brought you SeaORM:

Chris Tsang

Communityโ€‹

SeaQL.org is an independent open-source organization run by passionate ๏ธdevelopers. If you like our projects, please star โญ and share our repositories. If you feel generous, a small donation via GitHub Sponsor will be greatly appreciated, and goes a long way towards sustaining the organization ๐Ÿšข.

SeaStreamer is a community driven project. We welcome you to participate, contribute and together build for Rust's future ๐Ÿฆ€.


  1. except sea-streamer-stdio, but only contends on consumer add/dropโ†ฉ

ยท 10 min read
SeaQL Team

๐ŸŽ‰ We are pleased to release SeaORM 0.11.0!

Data Loaderโ€‹

[#1443, #1238] The LoaderTrait provides an API to load related entities in batches.

Consider this one to many relation:

let cake_with_fruits: Vec<(cake::Model, Vec<fruit::Model>)> = Cake::find()
.find_with_related(Fruit)
.all(db)
.await?;

The generated SQL is:

SELECT
"cake"."id" AS "A_id",
"cake"."name" AS "A_name",
"fruit"."id" AS "B_id",
"fruit"."name" AS "B_name",
"fruit"."cake_id" AS "B_cake_id"
FROM "cake"
LEFT JOIN "fruit" ON "cake"."id" = "fruit"."cake_id"
ORDER BY "cake"."id" ASC

The 1 side's (Cake) data will be duplicated. If N is a large number, this would results in more data being transferred over the wire. Using the Loader would ensure each model is transferred only once.

The following loads the same data as above, but with two queries:

let cakes: Vec<cake::Model> = Cake::find().all(db).await?;
let fruits: Vec<Vec<fruit::Model>> = cakes.load_many(Fruit, db).await?;

for (cake, fruits) in cakes.into_iter().zip(fruits.into_iter()) { .. }
SELECT "cake"."id", "cake"."name" FROM "cake"
SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit" WHERE "fruit"."cake_id" IN (..)

You can even apply filters on the related entity:

let fruits_in_stock: Vec<Vec<fruit::Model>> = cakes.load_many(
fruit::Entity::find().filter(fruit::Column::Stock.gt(0i32))
db
).await?;
SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"
WHERE "fruit"."stock" > 0 AND "fruit"."cake_id" IN (..)

To learn more, read the relation docs.

Transaction Isolation Level and Access Modeโ€‹

[#1230] The transaction_with_config and begin_with_config allows you to specify the IsolationLevel and AccessMode.

For now, they are only implemented for MySQL and Postgres. In order to align their semantic difference, MySQL will execute SET TRANSACTION commands before begin transaction, while Postgres will execute SET TRANSACTION commands after begin transaction.

db.transaction_with_config::<_, _, DbErr>(
|txn| { ... },
Some(IsolationLevel::ReadCommitted),
Some(AccessMode::ReadOnly),
)
.await?;

let transaction = db
.begin_with_config(IsolationLevel::ReadCommitted, AccessMode::ReadOnly)
.await?;

To learn more, read the transaction docs.

Cast Column Type on Select and Saveโ€‹

[#1304] If you need to select a column as one type but save it into the database as another, you can specify the select_as and the save_as attributes to perform the casting. A typical use case is selecting a column of type citext (case-insensitive text) as String in Rust and saving it into the database as citext. One should define the model field as below:

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "ci_table")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(select_as = "text", save_as = "citext")]
pub case_insensitive_text: String
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

Changes to ActiveModelBehaviorโ€‹

[#1328, #1145] The methods of ActiveModelBehavior now have Connection as an additional parameter. It enables you to perform database operations, for example, logging the changes made to the existing model or validating the data before inserting it.

#[async_trait]
impl ActiveModelBehavior for ActiveModel {
/// Create a new ActiveModel with default values. Also used by `Default::default()`.
fn new() -> Self {
Self {
uuid: Set(Uuid::new_v4()),
..ActiveModelTrait::default()
}
}

/// Will be triggered before insert / update
async fn before_save<C>(self, db: &C, insert: bool) -> Result<Self, DbErr>
where
C: ConnectionTrait,
{
// Logging changes
edit_log::ActiveModel {
action: Set("before_save".into()),
values: Set(serde_json::json!(model)),
..Default::default()
}
.insert(db)
.await?;

Ok(self)
}
}

To learn more, read the entity docs.

Execute Unprepared SQL Statementโ€‹

[#1327] You can execute an unprepared SQL statement with ConnectionTrait::execute_unprepared.

// Use `execute_unprepared` if the SQL statement doesn't have value bindings
db.execute_unprepared(
"CREATE TABLE `cake` (
`id` int NOT NULL AUTO_INCREMENT PRIMARY KEY,
`name` varchar(255) NOT NULL
)"
)
.await?;

// Construct a `Statement` if the SQL contains value bindings
let stmt = Statement::from_sql_and_values(
manager.get_database_backend(),
r#"INSERT INTO `cake` (`name`) VALUES (?)"#,
["Cheese Cake".into()]
);
db.execute(stmt).await?;

Select Into Tupleโ€‹

[#1311] You can select a tuple (or single value) with the into_tuple method.

let res: Vec<(String, i64)> = cake::Entity::find()
.select_only()
.column(cake::Column::Name)
.column(cake::Column::Id.count())
.group_by(cake::Column::Name)
.into_tuple()
.all(&db)
.await?;

Atomic Migrationโ€‹

[#1379] Migration will be executed in Postgres atomically that means migration scripts will be executed inside a transaction. Changes done to the database will be rolled back if the migration failed. However, atomic migration is not supported in MySQL and SQLite.

You can start a transaction inside each migration to perform operations like seeding sample data for a newly created table.

Types Supportโ€‹

  • [#1325] Support various UUID formats that are available in uuid::fmt module
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "uuid_fmt")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub uuid: Uuid,
pub uuid_braced: uuid::fmt::Braced,
pub uuid_hyphenated: uuid::fmt::Hyphenated,
pub uuid_simple: uuid::fmt::Simple,
pub uuid_urn: uuid::fmt::Urn,
}
  • [#1210] Support vector of enum for Postgres
#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "tea")]
pub enum Tea {
#[sea_orm(string_value = "EverydayTea")]
EverydayTea,
#[sea_orm(string_value = "BreakfastTea")]
BreakfastTea,
}

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "enum_vec")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub teas: Vec<Tea>,
pub teas_opt: Option<Vec<Tea>>,
}
  • [#1414] Support ActiveEnum field as primary key
#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "enum_primary_key")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: Tea,
pub category: Option<Category>,
pub color: Option<Color>,
}

Opt-in Unstable Internal APIsโ€‹

By enabling sea-orm-internal feature you opt-in unstable internal APIs including:

Breaking Changesโ€‹

  • [#1366] sea-query has been upgraded to 0.28.x, which comes with some improvements and breaking changes. Please follow the