Introducing Goblero: a Go Embedded Job Queue
Gophers Meet Badgers
Over the years I’ve used a number of Job Queues such as Sidekiq, Kue, or Faktory and they are often a central piece of many apps I build. As embedded databases are so popular in the Go world, I thought I’d try to create an open source embedded job queue for Go apps: Goblero
What is a Job Queue ?
Job Queues are also known as Background Job Queues, Task Queues or Work Servers. They can be very handy to handle asynchronous tasks or long running tasks that don’t belong in a web request handler for example. Most Job Queues are backed by a database such as Redis / RabbitMQ / Kafka that provides fast jobs storage/retrieval and sometimes also Pub/Sub capabilities.
The diagram above demonstrates a use case for a Job Queue:
The Http Handler receives an Http request from a Web Client, containing for example a file that needs some heavy processing.
The Http Handler responds as fast as possible and delegates the heavy lifting to the job queue.
Why an Embedded Job Queue ?
While Job Queues such as Sidekiq or Celery are more geared towards distributed processing by multiple workers, the idea behind an Embedded Job Queue is that it will run within the same process that creates the jobs. The main benefit is that we do not need to run a separate process for job processing and we also do not need to manage an external database. An Embedded Job Queue can be useful for small, local tasks where simple persistence is needed.
For example, let’s imagine a Go CLI app that receives in input a text file with 10,000 urls, fetches some data from another source for each url and then sends a POST request to the url. Imagine that it processes 5 urls at a time, and then crashes after 2,000 urls. How do we know where to restart ? Which urls were already processed ? Of course you could create an ad-hoc solution for your app to keep track of the processing state, but the idea behind a job queue is to allow you keep track of the processing state in a simple and reliable manner, with some data durability guarantees.
Enter Goblero
Goblero aims to be a Simple, Fast, Embedded, Persistent Job Queue for Go Apps. It’s still in alpha and not production ready at all, but most of the basic functionality is already there and the idea is to keep the features simple and you can already start playing with it in your Go side-projects.
I choose BadgerDB as a backing embedded database, as it seemed to have some interesting properties such as:
Key Value Store
Pure Go
Sorted KV access
ACID Concurrent Transactions
The source code in the repo should be pretty accessible if you want to learn about the library internals. The design is of course experimental at the moment. Issues, suggestions and pull requests are welcome !
In this article I wanted to focus on demonstrating it’s usage. We’ll use the Goblero Demo github repo which contains a simple working app that makes use of a Goblero Job Queue.
To get started with the demo, you will need to have Go installed (https://golang.org/dl/). Then you can clone the repo using:
git clone https://github.com/didil/goblero-demo.git
Get the package:
go get -u github.com/didil/goblero/pkg/blero
Build the demo app:
go build .
Here is the code that we’ll be running:
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/didil/goblero/pkg/blero"
)
func main() {
// Parse flags
n := flag.Int("n", 1, "number of processors")
flag.Parse()
// Create a new Blero backend
bl := blero.New("db/")
// Start Blero
err := bl.Start()
if err != nil {
log.Fatal(err)
}
// Register processor(s)
for i := 1; i <= *n; i++ {
pI := i
fmt.Printf("Registering Processor %v ...\n", pI)
bl.RegisterProcessorFunc(func(j *blero.Job) error {
fmt.Printf("[Processor %v] Processing job: %v - data: %v\n", pI, j.Name, string(j.Data))
// Simulate processing
time.Sleep(2 * time.Second)
fmt.Printf("[Processor %v] Done Processing job: %v\n", pI, j.Name)
return nil
})
}
// Enqueue jobs
if len(os.Args) > 1 && os.Args[1] == "enqueue" {
fmt.Println("Enqueuing jobs ...")
for i := 1; i <= 50; i++ {
jobName := fmt.Sprintf("Job #%v", i)
jobData := []byte(fmt.Sprintf("Job Data #%v", i))
_, err := bl.EnqueueJob(jobName, jobData)
if err != nil {
log.Fatal(err)
}
}
}
// Wait for SIGTERM or SIGINT to stop Blero and exit
var exitCh = make(chan os.Signal)
signal.Notify(exitCh, syscall.SIGINT)
signal.Notify(exitCh, syscall.SIGTERM)
s := <-exitCh
fmt.Printf("Caught signal %v. Exiting ...\n", s)
// Stop Blero
bl.Stop()
os.Exit(0)
}
To start the app we’ll run:
./goblero-demo enqueue
This command:
starts the demo app
creates a new Blero backend
creates a Badger database in the db folder of the repo
Registers a Processor which is a function that will run the jobs (it will simulate some task processing by sleeping 2 seconds then returning)
creates and enqueues 50 jobs
Your output should look something like this:
$ ./goblero-demo enqueue
Starting Blero ...
Registering Processor 1 ...
Enqueuing jobs ...
[Processor 1] Processing job: Job #1 - data: Job Data #1
[Processor 1] Done Processing job: Job #1
[Processor 1] Processing job: Job #2 - data: Job Data #2
[Processor 1] Done Processing job: Job #2
[Processor 1] Processing job: Job #3 - data: Job Data #3
[Processor 1] Done Processing job: Job #3
[Processor 1] Processing job: Job #4 - data: Job Data #4
[Processor 1] Done Processing job: Job #4
[Processor 1] Processing job: Job #5 - data: Job Data #5
^CCaught signal interrupt. Exiting ...
Stopping Blero ...
I’ve stopped the app after Job #5 started voluntarily. Let’s restart the app without the “enqueue” command
$ ./goblero-demo
Starting Blero ...
Registering Processor 1 ...
[Processor 1] Processing job: Job #6 - data: Job Data #6
[Processor 1] Done Processing job: Job #6
[Processor 1] Processing job: Job #7 - data: Job Data #7
[Processor 1] Done Processing job: Job #7
[Processor 1] Processing job: Job #8 - data: Job Data #8
^CCaught signal interrupt. Exiting ...
Stopping Blero ...
It continues right where it last stopped ! that’s the idea, you might notice that Job #5 is in an ambiguous status now. Is it done ? The library does not yet support continuing work on incomplete jobs but it’s a feature that should be added soon.
Let’s now try to add more job processors and see what happens. We can can do that by running the demo with the -n flag to change the number of processors.
Starting Blero ...
Registering Processor 1 ...
Registering Processor 2 ...
Registering Processor 3 ...
[Processor 3] Processing job: Job #9 - data: Job Data #9
[Processor 1] Processing job: Job #10 - data: Job Data #10
[Processor 2] Processing job: Job #11 - data: Job Data #11
[Processor 2] Done Processing job: Job #11
[Processor 3] Done Processing job: Job #9
[Processor 1] Done Processing job: Job #10
[Processor 2] Processing job: Job #12 - data: Job Data #12
[Processor 3] Processing job: Job #13 - data: Job Data #13
[Processor 1] Processing job: Job #14 - data: Job Data #14
[Processor 2] Done Processing job: Job #12
[Processor 1] Done Processing job: Job #14
[Processor 2] Processing job: Job #15 - data: Job Data #15
[Processor 1] Processing job: Job #16 - data: Job Data #16
[Processor 3] Done Processing job: Job #13
[Processor 3] Processing job: Job #17 - data: Job Data #17
^CCaught signal interrupt. Exiting ...
Stopping Blero ...
The work continues and we’re now processing 3 jobs in parallel ! Goblero internally distributes the jobs across goroutines, but you don’t have to deal with channels/signaling etc. There is no support for timeouts yet but that’s a planned feature through Go contexts.
Quick Benchmark
There is still a lot of optimisation to be done on the internals/data storage model/locking etc but I’ve added a couple of benchmark to the test files
# Core i5 laptop / 8GB Ram / SSD
make bench
BenchmarkEnqueue/EnqueueJob-4 50000 159942 ns/op(~ 6250 ops/s)
BenchmarkEnqueue/dequeueJob-4 5000 2767260 ns/op(~ 361 ops/s)
You’ll notice that dequeuing is somewhat slow at the moment (360 ops/second on my test laptop). But for a quick first version of something that’s mainly built to process long running jobs in a single process: not too bad !
I think the slow dequeues are mostly due to the fact that we’re saving to disk after each operation to avoid losing data on crashes + the dequeue process takes multiple steps at the moment:
DB Seek to find the next pending job
DB Read + Deserialize the data to a Job struct
DB Delete the job from the pending Queue
DB Set the job on the active Queue
Todo:
Some tasks that still need some work:
Restart interrupted jobs after app restart/crashes
Sweep completed jobs from the “complete” queue
Failed Jobs retry options
Allow batch enqueuing
Add support for Go contexts
Test in real conditions under high load
Expose Prometheus Metrics in an Http handler
Optimize performance / Locking
I hope that you will find the Goblero library useful, and please reach out if you have any ideas/suggestions/thoughts. Happy Go hacking !