sheepmq/queue/badger.go

158 lines
3.1 KiB
Go

package queue
import (
"os"
"sync/atomic"
"github.com/dgraph-io/badger/badger"
"github.com/golang/protobuf/proto"
"github.com/tblyler/sheepmq/lease"
"github.com/tblyler/sheepmq/shepard"
)
// Badger uses Badger KV store as a queue backend
type Badger struct {
opts *Options
bopts *badger.Options
kv *badger.KV
currentID uint64
idconv *idConverter
leases *lease.Manager
}
// NewBadger creates a new instance of a Badger-backed Queue
func NewBadger(opts *Options) (*Badger, error) {
// use default badger options if none provided
var bopts badger.Options
if opts.BadgerOptions == nil {
bopts = badger.DefaultOptions
} else {
bopts = *opts.BadgerOptions
}
// make sure the directory exists
err := os.MkdirAll(opts.Dir, defaultFilePerm)
if err != nil {
return nil, err
}
// always honor Options' dir setting over badger.Options' dir settings
bopts.Dir = opts.Dir
bopts.ValueDir = opts.Dir
// try to open new badger key value instance with the given options
kv, err := badger.NewKV(&bopts)
if err != nil {
return nil, err
}
var currentID uint64
iter := kv.NewIterator(badger.IteratorOptions{
PrefetchSize: 5,
FetchValues: false,
Reverse: true,
})
defer iter.Close()
for iter.Rewind(); iter.Valid(); iter.Next() {
currentID, err = byteToID(iter.Item().Key())
if err == nil {
break
}
// try to delete invalid entries
kv.Delete(iter.Item().Key())
currentID = 0
}
return &Badger{
opts: opts,
bopts: &bopts,
kv: kv,
currentID: currentID,
idconv: newIDConverter(),
leases: lease.NewManager(),
}, nil
}
// Close the internal key value store
func (b *Badger) Close() error {
return b.kv.Close()
}
// Get the next available ID atomically
func (b *Badger) getID() uint64 {
return atomic.AddUint64(&b.currentID, 1)
}
// AddItem to the queue
func (b *Badger) AddItem(item *shepard.Item) error {
item.Id = b.getID()
data, err := proto.Marshal(item)
if err != nil {
return err
}
byteID := b.idconv.idToByte(item.Id)
defer b.idconv.put(byteID)
return b.kv.Set(byteID, data)
}
// GetItem from the queue
func (b *Badger) GetItem(info *shepard.GetInfo, itemChan chan<- *shepard.Item) error {
iter := b.kv.NewIterator(badger.IteratorOptions{
PrefetchSize: 500,
FetchValues: true,
Reverse: false,
})
defer iter.Close()
var count uint64
for iter.Rewind(); iter.Valid() && count < info.Count; iter.Next() {
item := iter.Item()
id, err := byteToID(item.Key())
if err != nil {
// try to delete bad keys (don't care about failures)
b.kv.Delete(item.Key())
continue
}
err = b.leases.AddLease(id, info)
if err == nil || err == lease.ErrNoLeaser {
ret := &shepard.Item{}
err = proto.Unmarshal(item.Value(), ret)
if err != nil {
// delete bad values
b.kv.Delete(item.Key())
continue
}
count++
itemChan <- ret
}
}
return nil
}
// DelItem from the queue
func (b *Badger) DelItem(info *shepard.DelInfo) error {
var err error
for _, id := range info.GetIds() {
idByte := b.idconv.idToByte(id)
err = b.kv.Delete(idByte)
if err != nil {
return err
}
b.idconv.put(idByte)
}
return nil
}