rustapi-jobs: The Workhorse
Lens: “The Workhorse” Philosophy: “Fire and forget, with reliability guarantees.”
Background Processing
Long-running tasks shouldn’t block HTTP requests. rustapi-jobs provides a robust queue system that can run in-memory or be backed by Redis/Postgres.
Usage Example
Here is how to set up a simple background job queue using the in-memory backend.
1. Define the Job and Data
Jobs are separated into two parts:
- The Data struct (the payload), which must be serializable.
- The Job struct (the handler), which contains the logic.
#![allow(unused)]
fn main() {
use serde::{Deserialize, Serialize};
use rustapi_jobs::{Job, JobContext, Result};
use async_trait::async_trait;
// 1. The payload data
#[derive(Serialize, Deserialize, Debug, Clone)]
struct EmailJobData {
to: String,
subject: String,
body: String,
}
// 2. The handler struct (usually stateless)
#[derive(Clone)]
struct EmailJob;
#[async_trait]
impl Job for EmailJob {
const NAME: &'static str = "email_job";
type Data = EmailJobData;
async fn execute(&self, _ctx: JobContext, data: Self::Data) -> Result<()> {
println!("Sending email to {} with subject: {}", data.to, data.subject);
// Simulate work
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(())
}
}
}
2. Configure the Queue
In your main function, initialize the queue and start the worker.
use rustapi_jobs::{JobQueue, InMemoryBackend};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Create the backend
let backend = InMemoryBackend::new();
// 2. Create the queue
let queue = JobQueue::new(backend);
// 3. Register the job handler
queue.register_job(EmailJob).await;
// 4. Start the worker in the background
let worker_queue = queue.clone();
tokio::spawn(async move {
if let Err(e) = worker_queue.start_worker().await {
eprintln!("Worker failed: {:?}", e);
}
});
// 5. Enqueue a job (pass the DATA, not the handler)
queue.enqueue::<EmailJob>(EmailJobData {
to: "user@example.com".into(),
subject: "Welcome!".into(),
body: "Thanks for joining.".into(),
}).await?;
Ok(())
}
Backends
- Memory: Great for development and testing. Zero infrastructure required.
- Redis: High throughput persistence. Recommended for production.
- Postgres: Transactional reliability (ACID). Best if you cannot lose jobs.
Redis Backend
Enable the redis feature in Cargo.toml:
[dependencies]
rustapi-jobs = { version = "0.1.335", features = ["redis"] }
#![allow(unused)]
fn main() {
use rustapi_jobs::backend::redis::RedisBackend;
let backend = RedisBackend::new("redis://127.0.0.1:6379").await?;
let queue = JobQueue::new(backend);
}
Postgres Backend
Enable the postgres feature in Cargo.toml. This uses sqlx.
[dependencies]
rustapi-jobs = { version = "0.1.335", features = ["postgres"] }
#![allow(unused)]
fn main() {
use rustapi_jobs::backend::postgres::PostgresBackend;
use sqlx::postgres::PgPoolOptions;
let pool = PgPoolOptions::new().connect("postgres://user:pass@localhost/db").await?;
let backend = PostgresBackend::new(pool);
// Ensure the jobs table exists
backend.migrate().await?;
let queue = JobQueue::new(backend);
}
Reliability Features
The worker system includes built-in reliability features:
- Exponential Backoff: Automatically retries failing jobs with increasing delays.
- Dead Letter Queue (DLQ): “Poison” jobs that fail repeatedly are isolated for manual inspection.
- Concurrency Control: Limit the number of concurrent workers to prevent overloading your system.