Initial commit
This commit is contained in:
parent
daddfcac0d
commit
55baf08605
20 changed files with 1860 additions and 0 deletions
157
queue/badger.go
Normal file
157
queue/badger.go
Normal file
|
@ -0,0 +1,157 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/dgraph-io/badger/badger"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/tblyler/sheepmq/lease"
|
||||
"github.com/tblyler/sheepmq/shepard"
|
||||
)
|
||||
|
||||
// Badger uses Badger KV store as a queue backend
|
||||
type Badger struct {
|
||||
opts *Options
|
||||
bopts *badger.Options
|
||||
kv *badger.KV
|
||||
currentID uint64
|
||||
idconv *idConverter
|
||||
leases *lease.Manager
|
||||
}
|
||||
|
||||
// NewBadger creates a new instance of a Badger-backed Queue
|
||||
func NewBadger(opts *Options) (*Badger, error) {
|
||||
// use default badger options if none provided
|
||||
var bopts badger.Options
|
||||
if opts.BadgerOptions == nil {
|
||||
bopts = badger.DefaultOptions
|
||||
} else {
|
||||
bopts = *opts.BadgerOptions
|
||||
}
|
||||
|
||||
// make sure the directory exists
|
||||
err := os.MkdirAll(opts.Dir, defaultFilePerm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// always honor Options' dir setting over badger.Options' dir settings
|
||||
bopts.Dir = opts.Dir
|
||||
bopts.ValueDir = opts.Dir
|
||||
|
||||
// try to open new badger key value instance with the given options
|
||||
kv, err := badger.NewKV(&bopts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var currentID uint64
|
||||
|
||||
iter := kv.NewIterator(badger.IteratorOptions{
|
||||
PrefetchSize: 5,
|
||||
FetchValues: false,
|
||||
Reverse: true,
|
||||
})
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
for iter.Rewind(); iter.Valid(); iter.Next() {
|
||||
currentID, err = byteToID(iter.Item().Key())
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// try to delete invalid entries
|
||||
kv.Delete(iter.Item().Key())
|
||||
currentID = 0
|
||||
}
|
||||
|
||||
return &Badger{
|
||||
opts: opts,
|
||||
bopts: &bopts,
|
||||
kv: kv,
|
||||
currentID: currentID,
|
||||
idconv: newIDConverter(),
|
||||
leases: lease.NewManager(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close the internal key value store
|
||||
func (b *Badger) Close() error {
|
||||
return b.kv.Close()
|
||||
}
|
||||
|
||||
// Get the next available ID atomically
|
||||
func (b *Badger) getID() uint64 {
|
||||
return atomic.AddUint64(&b.currentID, 1)
|
||||
}
|
||||
|
||||
// AddItem to the queue
|
||||
func (b *Badger) AddItem(item *shepard.Item) error {
|
||||
item.Id = b.getID()
|
||||
|
||||
data, err := proto.Marshal(item)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
byteID := b.idconv.idToByte(item.Id)
|
||||
defer b.idconv.put(byteID)
|
||||
|
||||
return b.kv.Set(byteID, data)
|
||||
}
|
||||
|
||||
// GetItem from the queue
|
||||
func (b *Badger) GetItem(info *shepard.GetInfo, itemChan chan<- *shepard.Item) error {
|
||||
iter := b.kv.NewIterator(badger.IteratorOptions{
|
||||
PrefetchSize: 500,
|
||||
FetchValues: true,
|
||||
Reverse: false,
|
||||
})
|
||||
|
||||
defer iter.Close()
|
||||
|
||||
var count uint64
|
||||
for iter.Rewind(); iter.Valid() && count < info.Count; iter.Next() {
|
||||
item := iter.Item()
|
||||
id, err := byteToID(item.Key())
|
||||
if err != nil {
|
||||
// try to delete bad keys (don't care about failures)
|
||||
b.kv.Delete(item.Key())
|
||||
continue
|
||||
}
|
||||
|
||||
err = b.leases.AddLease(id, info)
|
||||
if err == nil || err == lease.ErrNoLeaser {
|
||||
ret := &shepard.Item{}
|
||||
err = proto.Unmarshal(item.Value(), ret)
|
||||
if err != nil {
|
||||
// delete bad values
|
||||
b.kv.Delete(item.Key())
|
||||
continue
|
||||
}
|
||||
|
||||
count++
|
||||
itemChan <- ret
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DelItem from the queue
|
||||
func (b *Badger) DelItem(info *shepard.DelInfo) error {
|
||||
var err error
|
||||
for _, id := range info.GetIds() {
|
||||
idByte := b.idconv.idToByte(id)
|
||||
err = b.kv.Delete(idByte)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.idconv.put(idByte)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
233
queue/badger_test.go
Normal file
233
queue/badger_test.go
Normal file
|
@ -0,0 +1,233 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/badger/badger"
|
||||
"github.com/tblyler/sheepmq/shepard"
|
||||
)
|
||||
|
||||
func TestNewBadger(t *testing.T) {
|
||||
// test a bad dir setting
|
||||
opts := &Options{
|
||||
Dir: "",
|
||||
}
|
||||
|
||||
_, err := NewBadger(opts)
|
||||
if err == nil {
|
||||
t.Error("Failed to get error on bad Dir setting")
|
||||
}
|
||||
|
||||
opts.Dir = filepath.Join(os.TempDir(), "NewBadgerTest")
|
||||
|
||||
// ensure clean directory
|
||||
os.RemoveAll(opts.Dir)
|
||||
defer os.RemoveAll(opts.Dir)
|
||||
|
||||
b, err := NewBadger(opts)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to create badger with default options")
|
||||
}
|
||||
|
||||
if b.currentID != 0 {
|
||||
t.Error("Current ID of an empty badger db should be 0 not", b.currentID)
|
||||
}
|
||||
|
||||
err = b.AddItem(&shepard.Item{})
|
||||
if err != nil {
|
||||
t.Fatal("Failed to add empty item", err)
|
||||
}
|
||||
|
||||
if b.currentID != 1 {
|
||||
t.Error("Current ID of a one item db should be 1 not", b.currentID)
|
||||
}
|
||||
|
||||
b.Close()
|
||||
|
||||
opts.BadgerOptions = &badger.Options{}
|
||||
|
||||
// make sure custom badger options are honored
|
||||
*opts.BadgerOptions = badger.DefaultOptions
|
||||
opts.BadgerOptions.SyncWrites = !opts.BadgerOptions.SyncWrites
|
||||
|
||||
b, err = NewBadger(opts)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to use custom badger optoins", err)
|
||||
}
|
||||
|
||||
defer b.Close()
|
||||
|
||||
if b.bopts.SyncWrites != opts.BadgerOptions.SyncWrites {
|
||||
t.Error("Failed to use custom badger options")
|
||||
}
|
||||
|
||||
if b.currentID != 1 {
|
||||
t.Error("current id != 1 got", b.currentID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBadgerAddGetItem(t *testing.T) {
|
||||
items := make([]*shepard.Item, 32)
|
||||
|
||||
for i := range items {
|
||||
items[i] = &shepard.Item{}
|
||||
|
||||
items[i].Data = make([]byte, 256*(i+1))
|
||||
rand.Read(items[i].Data)
|
||||
items[i].Ctime = time.Now().Unix()
|
||||
items[i].Queue = fmt.Sprint("testing", i)
|
||||
items[i].Stats = map[string]int64{
|
||||
"cool": 133333337,
|
||||
"notcool": 0,
|
||||
"#1": 1,
|
||||
"datSize": int64(len(items[i].Data)),
|
||||
}
|
||||
}
|
||||
|
||||
opts := &Options{
|
||||
Dir: filepath.Join(os.TempDir(), "TestBadgerAddGetItem"),
|
||||
}
|
||||
|
||||
os.RemoveAll(opts.Dir)
|
||||
defer os.RemoveAll(opts.Dir)
|
||||
|
||||
b, err := NewBadger(opts)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to open badger", err)
|
||||
}
|
||||
|
||||
defer b.Close()
|
||||
|
||||
for i, item := range items {
|
||||
err = b.AddItem(item)
|
||||
if err != nil {
|
||||
t.Error("Failed to add item", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
itemChan := make(chan *shepard.Item, len(items))
|
||||
err = b.GetItem(&shepard.GetInfo{
|
||||
Count: uint64(len(items)),
|
||||
}, itemChan)
|
||||
|
||||
close(itemChan)
|
||||
if err != nil {
|
||||
t.Error("Failed to get items", err)
|
||||
}
|
||||
|
||||
i := 0
|
||||
for item := range itemChan {
|
||||
if item.Ctime != items[i].Ctime {
|
||||
t.Error("item ctimes", item.Ctime, items[i].Ctime)
|
||||
}
|
||||
|
||||
if item.Queue != items[i].Queue {
|
||||
t.Error("item queues", item.Queue, items[i].Queue)
|
||||
}
|
||||
|
||||
if !bytes.Equal(item.Data, items[i].Data) {
|
||||
t.Error("item data", item.Data, items[i].Data)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(item.Stats, items[i].Stats) {
|
||||
t.Error("item stats", item.Stats, items[i].Stats)
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
if i != len(items) {
|
||||
t.Error("got", i, "items expected", len(items))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBadgerDelItem(t *testing.T) {
|
||||
opts := &Options{
|
||||
Dir: filepath.Join(os.TempDir(), "TestBadgerAddGetItem"),
|
||||
}
|
||||
|
||||
os.RemoveAll(opts.Dir)
|
||||
defer os.RemoveAll(opts.Dir)
|
||||
|
||||
b, err := NewBadger(opts)
|
||||
if err != nil {
|
||||
t.Fatal("Failed to start badger", err)
|
||||
}
|
||||
|
||||
items := make([]*shepard.Item, 32)
|
||||
for i := range items {
|
||||
items[i] = &shepard.Item{
|
||||
Ctime: time.Now().Unix(),
|
||||
Data: make([]byte, 256*(i+1)),
|
||||
Queue: "The queue",
|
||||
Stats: map[string]int64{
|
||||
"lol": 10101010101,
|
||||
"index": int64(i),
|
||||
"datasize:": int64(256 * (i + 1)),
|
||||
},
|
||||
}
|
||||
rand.Read(items[i].Data)
|
||||
|
||||
err = b.AddItem(items[i])
|
||||
if err != nil {
|
||||
t.Error("Failed to add item", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
delinfo := &shepard.DelInfo{}
|
||||
for i := range items {
|
||||
if i%2 == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
delinfo.Ids = append(delinfo.Ids, uint64(i))
|
||||
}
|
||||
|
||||
err = b.DelItem(delinfo)
|
||||
if err != nil {
|
||||
t.Error("Failed to delete", delinfo.Ids, err)
|
||||
}
|
||||
|
||||
getinfo := &shepard.GetInfo{
|
||||
Count: uint64(len(items) - len(delinfo.Ids)),
|
||||
}
|
||||
getChan := make(chan *shepard.Item, getinfo.Count)
|
||||
|
||||
err = b.GetItem(getinfo, getChan)
|
||||
if err != nil {
|
||||
t.Error("Failed to get items", err)
|
||||
}
|
||||
|
||||
close(getChan)
|
||||
|
||||
i := 1
|
||||
for item := range getChan {
|
||||
if item.Ctime != items[i].Ctime {
|
||||
t.Error("item ctimes", item.Ctime, items[i].Ctime)
|
||||
}
|
||||
|
||||
if item.Queue != items[i].Queue {
|
||||
t.Error("item queues", item.Queue, items[i].Queue)
|
||||
}
|
||||
|
||||
if !bytes.Equal(item.Data, items[i].Data) {
|
||||
t.Error("item data", item.Data, items[i].Data)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(item.Stats, items[i].Stats) {
|
||||
t.Error("item stats", item.Stats, items[i].Stats)
|
||||
}
|
||||
i += 2
|
||||
}
|
||||
|
||||
if i != len(items)+1 {
|
||||
t.Error("only looped to item index", i)
|
||||
}
|
||||
}
|
70
queue/queue.go
Normal file
70
queue/queue.go
Normal file
|
@ -0,0 +1,70 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/dgraph-io/badger/badger"
|
||||
"github.com/tblyler/sheepmq/shepard"
|
||||
)
|
||||
|
||||
const (
|
||||
idByteSize = 8
|
||||
defaultFilePerm = os.FileMode(0700)
|
||||
)
|
||||
|
||||
// Queue defines a resource to store queue items
|
||||
type Queue interface {
|
||||
AddItem(*shepard.Item) error
|
||||
GetItem(*shepard.GetInfo) (*shepard.Item, error)
|
||||
DelItem(*shepard.DelInfo) error
|
||||
}
|
||||
|
||||
// Options to be used when creating a new Queue
|
||||
type Options struct {
|
||||
// Directory to store queue data
|
||||
Dir string
|
||||
|
||||
// Badger queue specific options
|
||||
BadgerOptions *badger.Options
|
||||
}
|
||||
|
||||
type idConverter struct {
|
||||
pool sync.Pool
|
||||
}
|
||||
|
||||
func newIDConverter() *idConverter {
|
||||
return &idConverter{
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, idByteSize)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (i *idConverter) idToByte(id uint64) []byte {
|
||||
buf := i.pool.Get().([]byte)
|
||||
|
||||
binary.LittleEndian.PutUint64(buf, id)
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
func (i *idConverter) put(data []byte) {
|
||||
i.pool.Put(data)
|
||||
}
|
||||
|
||||
func byteToID(data []byte) (uint64, error) {
|
||||
if len(data) < idByteSize {
|
||||
return 0, fmt.Errorf(
|
||||
"unable to convert byte slice length of %d, need at least %d",
|
||||
len(data),
|
||||
idByteSize,
|
||||
)
|
||||
}
|
||||
|
||||
return binary.LittleEndian.Uint64(data), nil
|
||||
}
|
67
queue/queue_test.go
Normal file
67
queue/queue_test.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestIDConverter(t *testing.T) {
|
||||
idconv := newIDConverter()
|
||||
|
||||
idByte := idconv.idToByte(math.MaxUint64)
|
||||
|
||||
// reserved keywords become english mistakes
|
||||
for _, bite := range idByte {
|
||||
if bite != byte(255) {
|
||||
t.Error("maxuint64 is not all max bytes", idByte)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
idconv.put(idByte)
|
||||
|
||||
firstAddress := reflect.ValueOf(idByte).Pointer()
|
||||
|
||||
idByte = idconv.idToByte(0)
|
||||
secondAddress := reflect.ValueOf(idByte).Pointer()
|
||||
|
||||
if firstAddress != secondAddress {
|
||||
t.Error("Failed to use byte pool")
|
||||
}
|
||||
|
||||
for _, bite := range idByte {
|
||||
if bite != 0 {
|
||||
t.Error("zero should be all zero bytes", idByte)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
idconv.put(idByte)
|
||||
|
||||
id := uint64(582348138342)
|
||||
idByte = idconv.idToByte(id)
|
||||
knownByte := []byte{
|
||||
102, 103, 167, 150, 135, 0, 0, 0,
|
||||
}
|
||||
|
||||
if !bytes.Equal(idByte, knownByte) {
|
||||
t.Error("Failed to encode id exepect", knownByte, "got", idByte)
|
||||
}
|
||||
|
||||
idconv.put(idByte)
|
||||
|
||||
newID, err := byteToID(knownByte)
|
||||
if err != nil {
|
||||
t.Error("error converting byte to id", err)
|
||||
}
|
||||
if newID != id {
|
||||
t.Error("expected id", id, "got", newID)
|
||||
}
|
||||
|
||||
_, err = byteToID([]byte{1, 2, 3, 4, 5})
|
||||
if err == nil {
|
||||
t.Error("Failed to get error for bad byte to id data")
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue