Documentation
¶
Overview ¶
Package queue implements a few types of queues. Notably an asynchronous queue for non-blocking data processing and a SQLite3 queue.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrEmpty = errors.New("queue: queue is empty")
ErrEmpty is returned when dequeuing from an empty queue.
Functions ¶
This section is empty.
Types ¶
type AsyncQueue ¶
type AsyncQueue interface {
// Add data to the queue. Safe for concurrent use.
Enqueue(data []byte) error
// Close the queue. Can be done at any point after the queue is
// constructed.
io.Closer
}
AsyncQueue processes queue data asynchronously.
Example ¶
An example of using the async queue to process data.
package main
import (
"fmt"
"log"
"time"
"github.com/esote/queue"
)
func main() {
sqlite3, err := queue.NewSqlite3Queue("test.db")
if err != nil {
log.Fatal(err)
}
defer sqlite3.Close()
handler := func(data []byte, err error) {
if err != nil {
log.Println(err)
return
}
// Note printing via fmt is racey with multiple workers.
fmt.Println(string(data))
}
// q is an async queue, backed by SQLite3, with one worker. It is
// generally recommended to have multiple workers.
q, err := queue.NewAsyncQueue(sqlite3, handler, 1)
if err != nil {
log.Fatal(err)
}
defer q.Close()
msgs := []string{"hi", "hello", "hey", "hiya"}
for _, msg := range msgs {
if err = q.Enqueue([]byte(msg)); err != nil {
log.Fatal(err)
}
}
// In an async queue, data will be dequeued "eventually." For the
// purposes of this example we wait for all of it to be processed.
time.Sleep(10 * time.Millisecond)
}
Output: hi hello hey hiya
func NewAsyncQueue ¶
func NewAsyncQueue(q Queue, handler Handler, workers int) (AsyncQueue, error)
NewAsyncQueue creates an async queue that processes inner queue data through a handler and worker pool. Closing the async queue does NOT close the inner queue.
type Handler ¶
Handler operates on data from the async queue. Err comes from the inner queue's dequeue operation when err is not ErrEmpty.
type Queue ¶
type Queue interface {
// Add data to the queue. Safe for concurrent use.
Enqueue(data []byte) error
// Remove data from the queue. Safe for concurrent use. Returns ErrEmpty
// if the queue contains no data.
Dequeue() ([]byte, error)
// Close the queue.
io.Closer
}
Queue contains data.
func NewSqlite3Queue ¶
NewSqlite3Queue creates an SQLite3-backed queue with ACID properties.