From 55baf08605d441721815b824e07915d01588a525 Mon Sep 17 00:00:00 2001 From: Tony Blyler Date: Thu, 22 Jun 2017 19:25:38 -0400 Subject: [PATCH] Initial commit --- README.md | 4 + lease/heart.go | 34 +++ lease/heart_test.go | 52 ++++ lease/leaser.go | 10 + lease/manager.go | 94 +++++++ lease/manager_test.go | 134 +++++++++ lease/pid.go | 34 +++ lease/pid_test.go | 31 +++ lease/timeout.go | 27 ++ lease/timeout_test.go | 26 ++ main.go | 52 ++++ queue/badger.go | 157 +++++++++++ queue/badger_test.go | 233 ++++++++++++++++ queue/queue.go | 70 +++++ queue/queue_test.go | 67 +++++ sheepmq/grpc_server.go | 74 +++++ sheepmq/sheepmq.go | 38 +++ shepard/shepard.pb.go | 607 +++++++++++++++++++++++++++++++++++++++++ shepard/shepard.proto | 108 ++++++++ update_grpc.sh | 8 + 20 files changed, 1860 insertions(+) create mode 100644 README.md create mode 100644 lease/heart.go create mode 100644 lease/heart_test.go create mode 100644 lease/leaser.go create mode 100644 lease/manager.go create mode 100644 lease/manager_test.go create mode 100644 lease/pid.go create mode 100644 lease/pid_test.go create mode 100644 lease/timeout.go create mode 100644 lease/timeout_test.go create mode 100644 main.go create mode 100644 queue/badger.go create mode 100644 queue/badger_test.go create mode 100644 queue/queue.go create mode 100644 queue/queue_test.go create mode 100644 sheepmq/grpc_server.go create mode 100644 sheepmq/sheepmq.go create mode 100644 shepard/shepard.pb.go create mode 100644 shepard/shepard.proto create mode 100755 update_grpc.sh diff --git a/README.md b/README.md new file mode 100644 index 0000000..9768734 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +# SheepMQ +Aiming to be a robust message queue library and daemon with a variety of storage backends. + +# Still a work in progress diff --git a/lease/heart.go b/lease/heart.go new file mode 100644 index 0000000..8b189ed --- /dev/null +++ b/lease/heart.go @@ -0,0 +1,34 @@ +package lease + +import "time" + +// Heart contains a lease that survives when heartbeats happen within a given ttl interval +type Heart struct { + lastBeat time.Time + ttl time.Duration +} + +// NewHeart creates a new heart instance +func NewHeart(ttl int64) *Heart { + ttlDuration := time.Nanosecond * time.Duration(ttl) + return &Heart{ + lastBeat: time.Now(), + ttl: ttlDuration, + } +} + +// Valid tries to beat the heart, true if still alive, false if dead +func (h *Heart) Valid() bool { + if !h.Check() { + return false + } + + // update the last beat to now + h.lastBeat = time.Now() + return true +} + +// Check if the heartbeat is valid +func (h *Heart) Check() bool { + return time.Since(h.lastBeat) <= h.ttl +} diff --git a/lease/heart_test.go b/lease/heart_test.go new file mode 100644 index 0000000..e1dbee7 --- /dev/null +++ b/lease/heart_test.go @@ -0,0 +1,52 @@ +package lease + +import ( + "testing" + "time" +) + +func TestHeartCheck(t *testing.T) { + heart := NewHeart(int64(time.Second)) + + if !heart.Check() { + t.Error("Heartbeat check failed first check too soon") + } + + time.Sleep(time.Second + time.Nanosecond) + + if heart.Check() { + t.Error("Heartbeat check succeeded after its ttl") + } +} + +func TestHeartValid(t *testing.T) { + heart := NewHeart(int64(time.Second)) + + if !heart.Valid() { + t.Error("Heartbeat valid failed first check too soon") + } + + time.Sleep(time.Second / 3) + + if !heart.Valid() { + t.Error("Heartbeat valid failed second check too soon") + } + + time.Sleep(time.Second / 2) + + if !heart.Valid() { + t.Error("Hearbeat valid failed third check too soon") + } + + time.Sleep(time.Second / 2) + + if !heart.Valid() { + t.Error("Heartbeat valid failed fourth check too soon") + } + + time.Sleep(time.Second) + + if heart.Valid() { + t.Error("Heartbeat valid succeeded after its ttl") + } +} diff --git a/lease/leaser.go b/lease/leaser.go new file mode 100644 index 0000000..1240381 --- /dev/null +++ b/lease/leaser.go @@ -0,0 +1,10 @@ +package lease + +// Leaser denotes whether a given lease is still valid +type Leaser interface { + // Update and check the leaser + Valid() bool + + // Check the leaser without any sort of updates + Check() bool +} diff --git a/lease/manager.go b/lease/manager.go new file mode 100644 index 0000000..84ea8a0 --- /dev/null +++ b/lease/manager.go @@ -0,0 +1,94 @@ +package lease + +import ( + "errors" + "sync" + + "github.com/tblyler/sheepmq/shepard" +) + +var ( + // ErrLeased denotes the item is already leased + ErrLeased = errors.New("Item is already leased") + + // ErrNoLeaser denotes no leaser was provided to add a lease + ErrNoLeaser = errors.New("No leaser provided") +) + +// Manager contains many leases and their validity +type Manager struct { + leases map[uint64]Leaser + locker sync.RWMutex +} + +// NewManager creates a new Manager instance +func NewManager() *Manager { + return &Manager{ + leases: make(map[uint64]Leaser), + } +} + +// AddLease to the manager for the given item +func (m *Manager) AddLease(id uint64, info *shepard.GetInfo) error { + if m.CheckLease(id) { + return ErrLeased + } + + var leaser Leaser + if info.TimeoutLease != nil { + leaser = NewTimeout(info.TimeoutLease.Ttl) + } else if info.PidLease != nil { + leaser = NewPID(int(info.PidLease.Pid)) + } else if info.HeartLease != nil { + leaser = NewHeart(info.HeartLease.Ttl) + } else { + return ErrNoLeaser + } + + m.locker.Lock() + m.leases[id] = leaser + m.locker.Unlock() + + return nil +} + +// CheckLease for validity +func (m *Manager) CheckLease(id uint64) bool { + m.locker.RLock() + lease, exists := m.leases[id] + if !exists { + m.locker.RUnlock() + return false + } + + ret := lease.Valid() + m.locker.RUnlock() + if !ret { + // delete the lease since it is no longer valid + m.locker.Lock() + delete(m.leases, id) + m.locker.Unlock() + } + + return ret +} + +// PruneLeases that fail their checks +func (m *Manager) PruneLeases() { + deleteKeys := []uint64{} + m.locker.RLock() + + for key, lease := range m.leases { + if !lease.Check() { + deleteKeys = append(deleteKeys, key) + } + } + + m.locker.RUnlock() + m.locker.Lock() + for _, key := range deleteKeys { + delete(m.leases, key) + } + + m.locker.Unlock() +} diff --git a/lease/manager_test.go b/lease/manager_test.go new file mode 100644 index 0000000..4b6199b --- /dev/null +++ b/lease/manager_test.go @@ -0,0 +1,134 @@ +package lease + +import ( + "testing" + "time" + + "github.com/tblyler/sheepmq/shepard" +) + +func TestManagerAddCheckLease(t *testing.T) { + manager := NewManager() + + info := &shepard.GetInfo{} + + err := manager.AddLease(123, info) + if err != ErrNoLeaser { + t.Error("Failed to get ErrNoLeaser on add lease got", err) + } + + info.PidLease = &shepard.PidLease{ + // use a bad PID + Pid: 0, + } + + err = manager.AddLease(123, info) + if err != nil { + t.Error("Failed to add a crappy PID leaser", err) + } + + err = manager.AddLease(123, info) + if err != nil { + t.Error("Failed to add a crappy PID leaser ontop of another", err) + } + + info.PidLease = nil + + info.HeartLease = &shepard.HeartbeatLease{ + Ttl: int64(time.Second / 2), + } + + err = manager.AddLease(2, info) + if err != nil { + t.Error("Failed to add a valid heartbeat lease", err) + } + + if !manager.CheckLease(2) { + t.Error("Failed to check for valid heartbeat lease") + } + + info.HeartLease = nil + info.TimeoutLease = &shepard.TimeLease{ + Ttl: int64(time.Second), + } + + err = manager.AddLease(123, info) + if err != nil { + t.Error("Failed to add a good timeout lease of 1 second") + } + + err = manager.AddLease(123, info) + if err == nil { + t.Error("Should not be able to add a lease ontop of another valid one") + } + + err = manager.AddLease(124, info) + if err != nil { + t.Error("Failed to add a valid lease against a different id", err) + } + + if !manager.CheckLease(123) { + t.Error("Failed to make sure valid lease was valid") + } + + if !manager.CheckLease(124) { + t.Error("Failed to make sure valid lease was valid") + } + + time.Sleep(time.Second) + + if manager.CheckLease(123) { + t.Error("failed to make sure invalid lease was invalid") + } + + if manager.CheckLease(124) { + t.Error("failed to make sure invalid lease was invalid") + } +} + +func TestManagerPruneLeases(t *testing.T) { + manager := NewManager() + + info := &shepard.GetInfo{} + info.PidLease = &shepard.PidLease{ + Pid: 0, + } + + err := manager.AddLease(5, info) + if err != nil { + t.Error("Failed to add crappy PID lease", err) + } + + info.PidLease = nil + info.TimeoutLease = &shepard.TimeLease{ + Ttl: int64(time.Second), + } + + err = manager.AddLease(2, info) + if err != nil { + t.Error("Failed to add valid timeout lease", err) + } + + info.TimeoutLease = nil + info.HeartLease = &shepard.HeartbeatLease{ + Ttl: int64(time.Second), + } + + err = manager.AddLease(1, info) + if err != nil { + t.Error("Failed to add valid heartbeat lease", err) + } + + manager.PruneLeases() + + if len(manager.leases) != 2 { + t.Errorf("Should have 2 leases left, have %d", len(manager.leases)) + } + + time.Sleep(time.Second) + + manager.PruneLeases() + if len(manager.leases) != 0 { + t.Errorf("Should have 0 leases left, have %d", len(manager.leases)) + } +} diff --git a/lease/pid.go b/lease/pid.go new file mode 100644 index 0000000..bf09ec4 --- /dev/null +++ b/lease/pid.go @@ -0,0 +1,34 @@ +package lease + +import ( + "os" + "syscall" +) + +// PID contains a lease that is valid until the given PID no longer exists +type PID struct { + pid int +} + +// NewPID creates a new PID leaser instance +func NewPID(pid int) *PID { + return &PID{ + pid: pid, + } +} + +// Valid checks if the PID still exists +func (p *PID) Valid() bool { + return p.Check() +} + +// Check if the PID still exists +func (p *PID) Check() bool { + process, err := os.FindProcess(p.pid) + if err != nil { + return false + } + + // if nil, PID exists + return process.Signal(syscall.Signal(0)) == nil +} diff --git a/lease/pid_test.go b/lease/pid_test.go new file mode 100644 index 0000000..f2909c8 --- /dev/null +++ b/lease/pid_test.go @@ -0,0 +1,31 @@ +package lease + +import ( + "os/exec" + "testing" +) + +func TestPIDValid(t *testing.T) { + // create a sleep command for 1 second + cmd := exec.Command("sleep", "1") + err := cmd.Start() + if err != nil { + t.Fatal("Failed to create a sleep process:", err) + } + + pid := NewPID(cmd.Process.Pid) + if !pid.Valid() { + t.Error("PID died too soon") + } + + cmd.Wait() + + if pid.Valid() { + t.Error("PID didn't die somehow") + } + + pid = NewPID(-1) + if pid.Valid() { + t.Error("Negative PIDS are not a thing") + } +} diff --git a/lease/timeout.go b/lease/timeout.go new file mode 100644 index 0000000..e4e4003 --- /dev/null +++ b/lease/timeout.go @@ -0,0 +1,27 @@ +package lease + +import ( + "time" +) + +// Timeout expires after a given timeout +type Timeout struct { + eol time.Time +} + +// NewTimeout creates a new timeout instance +func NewTimeout(ttl int64) *Timeout { + return &Timeout{ + eol: time.Now().Add(time.Nanosecond * time.Duration(ttl)), + } +} + +// Valid Determines whether the timeout has been reached +func (t *Timeout) Valid() bool { + return t.Check() +} + +// Check if the timeout has been reached +func (t *Timeout) Check() bool { + return time.Now().Before(t.eol) +} diff --git a/lease/timeout_test.go b/lease/timeout_test.go new file mode 100644 index 0000000..ae61d06 --- /dev/null +++ b/lease/timeout_test.go @@ -0,0 +1,26 @@ +package lease + +import ( + "testing" + "time" +) + +func TestTimeoutValid(t *testing.T) { + timeout := NewTimeout(int64(time.Second)) + + if !timeout.Valid() { + t.Error("timeout valid failed too soon") + } + + time.Sleep(time.Nanosecond * 100) + + if !timeout.Valid() { + t.Error("timeout valid failed too soon") + } + + time.Sleep(time.Second - (time.Nanosecond * 100)) + + if timeout.Valid() { + t.Error("timeout valid should not be succeeding") + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..fadfa21 --- /dev/null +++ b/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "fmt" + "net" + + "google.golang.org/grpc" + + flag "github.com/spf13/pflag" + "github.com/tblyler/sheepmq/sheepmq" + "github.com/tblyler/sheepmq/shepard" +) + +func main() { + var port uint16 + var listenAddr string + + flag.StringVarP(&listenAddr, "addr", "h", "", "The address to listen on") + flag.Uint16VarP(&port, "port", "p", 0, "The port to bind") + + flag.Parse() + + listenAddr = fmt.Sprintf("%s:%d", listenAddr, port) + + listener, err := net.Listen("tcp", listenAddr) + if err != nil { + fmt.Printf("Failed to listen on %s: %s", listenAddr, err) + return + } + + defer listener.Close() + + fmt.Println("Listening on", listener.Addr()) + + grpcServer := grpc.NewServer() + + sheepmqServer, err := sheepmq.NewServer() + if err != nil { + fmt.Println("Failed to open sheepmq server:", err) + return + } + + sheepmqGServer := sheepmq.NewGServer(sheepmqServer) + if err != nil { + fmt.Println("Failed to create sheepmq server:", err) + return + } + + shepard.RegisterLoqServer(grpcServer, sheepmqServer) + + grpcServer.Serve(listener) +} diff --git a/queue/badger.go b/queue/badger.go new file mode 100644 index 0000000..be4a044 --- /dev/null +++ b/queue/badger.go @@ -0,0 +1,157 @@ +package queue + +import ( + "os" + "sync/atomic" + + "github.com/dgraph-io/badger/badger" + "github.com/golang/protobuf/proto" + "github.com/tblyler/sheepmq/lease" + "github.com/tblyler/sheepmq/shepard" +) + +// Badger uses Badger KV store as a queue backend +type Badger struct { + opts *Options + bopts *badger.Options + kv *badger.KV + currentID uint64 + idconv *idConverter + leases *lease.Manager +} + +// NewBadger creates a new instance of a Badger-backed Queue +func NewBadger(opts *Options) (*Badger, error) { + // use default badger options if none provided + var bopts badger.Options + if opts.BadgerOptions == nil { + bopts = badger.DefaultOptions + } else { + bopts = *opts.BadgerOptions + } + + // make sure the directory exists + err := os.MkdirAll(opts.Dir, defaultFilePerm) + if err != nil { + return nil, err + } + + // always honor Options' dir setting over badger.Options' dir settings + bopts.Dir = opts.Dir + bopts.ValueDir = opts.Dir + + // try to open new badger key value instance with the given options + kv, err := badger.NewKV(&bopts) + if err != nil { + return nil, err + } + + var currentID uint64 + + iter := kv.NewIterator(badger.IteratorOptions{ + PrefetchSize: 5, + FetchValues: false, + Reverse: true, + }) + + defer iter.Close() + + for iter.Rewind(); iter.Valid(); iter.Next() { + currentID, err = byteToID(iter.Item().Key()) + if err == nil { + break + } + + // try to delete invalid entries + kv.Delete(iter.Item().Key()) + currentID = 0 + } + + return &Badger{ + opts: opts, + bopts: &bopts, + kv: kv, + currentID: currentID, + idconv: newIDConverter(), + leases: lease.NewManager(), + }, nil +} + +// Close the internal key value store +func (b *Badger) Close() error { + return b.kv.Close() +} + +// Get the next available ID atomically +func (b *Badger) getID() uint64 { + return atomic.AddUint64(&b.currentID, 1) +} + +// AddItem to the queue +func (b *Badger) AddItem(item *shepard.Item) error { + item.Id = b.getID() + + data, err := proto.Marshal(item) + if err != nil { + return err + } + + byteID := b.idconv.idToByte(item.Id) + defer b.idconv.put(byteID) + + return b.kv.Set(byteID, data) +} + +// GetItem from the queue +func (b *Badger) GetItem(info *shepard.GetInfo, itemChan chan<- *shepard.Item) error { + iter := b.kv.NewIterator(badger.IteratorOptions{ + PrefetchSize: 500, + FetchValues: true, + Reverse: false, + }) + + defer iter.Close() + + var count uint64 + for iter.Rewind(); iter.Valid() && count < info.Count; iter.Next() { + item := iter.Item() + id, err := byteToID(item.Key()) + if err != nil { + // try to delete bad keys (don't care about failures) + b.kv.Delete(item.Key()) + continue + } + + err = b.leases.AddLease(id, info) + if err == nil || err == lease.ErrNoLeaser { + ret := &shepard.Item{} + err = proto.Unmarshal(item.Value(), ret) + if err != nil { + // delete bad values + b.kv.Delete(item.Key()) + continue + } + + count++ + itemChan <- ret + } + } + + return nil +} + +// DelItem from the queue +func (b *Badger) DelItem(info *shepard.DelInfo) error { + var err error + for _, id := range info.GetIds() { + idByte := b.idconv.idToByte(id) + err = b.kv.Delete(idByte) + if err != nil { + return err + } + + b.idconv.put(idByte) + } + + return nil +} diff --git a/queue/badger_test.go b/queue/badger_test.go new file mode 100644 index 0000000..9e5a6d0 --- /dev/null +++ b/queue/badger_test.go @@ -0,0 +1,233 @@ +package queue + +import ( + "bytes" + "crypto/rand" + "fmt" + "os" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/dgraph-io/badger/badger" + "github.com/tblyler/sheepmq/shepard" +) + +func TestNewBadger(t *testing.T) { + // test a bad dir setting + opts := &Options{ + Dir: "", + } + + _, err := NewBadger(opts) + if err == nil { + t.Error("Failed to get error on bad Dir setting") + } + + opts.Dir = filepath.Join(os.TempDir(), "NewBadgerTest") + + // ensure clean directory + os.RemoveAll(opts.Dir) + defer os.RemoveAll(opts.Dir) + + b, err := NewBadger(opts) + if err != nil { + t.Fatal("Failed to create badger with default options") + } + + if b.currentID != 0 { + t.Error("Current ID of an empty badger db should be 0 not", b.currentID) + } + + err = b.AddItem(&shepard.Item{}) + if err != nil { + t.Fatal("Failed to add empty item", err) + } + + if b.currentID != 1 { + t.Error("Current ID of a one item db should be 1 not", b.currentID) + } + + b.Close() + + opts.BadgerOptions = &badger.Options{} + + // make sure custom badger options are honored + *opts.BadgerOptions = badger.DefaultOptions + opts.BadgerOptions.SyncWrites = !opts.BadgerOptions.SyncWrites + + b, err = NewBadger(opts) + if err != nil { + t.Fatal("Failed to use custom badger optoins", err) + } + + defer b.Close() + + if b.bopts.SyncWrites != opts.BadgerOptions.SyncWrites { + t.Error("Failed to use custom badger options") + } + + if b.currentID != 1 { + t.Error("current id != 1 got", b.currentID) + } +} + +func TestBadgerAddGetItem(t *testing.T) { + items := make([]*shepard.Item, 32) + + for i := range items { + items[i] = &shepard.Item{} + + items[i].Data = make([]byte, 256*(i+1)) + rand.Read(items[i].Data) + items[i].Ctime = time.Now().Unix() + items[i].Queue = fmt.Sprint("testing", i) + items[i].Stats = map[string]int64{ + "cool": 133333337, + "notcool": 0, + "#1": 1, + "datSize": int64(len(items[i].Data)), + } + } + + opts := &Options{ + Dir: filepath.Join(os.TempDir(), "TestBadgerAddGetItem"), + } + + os.RemoveAll(opts.Dir) + defer os.RemoveAll(opts.Dir) + + b, err := NewBadger(opts) + if err != nil { + t.Fatal("Failed to open badger", err) + } + + defer b.Close() + + for i, item := range items { + err = b.AddItem(item) + if err != nil { + t.Error("Failed to add item", i, err) + } + } + + itemChan := make(chan *shepard.Item, len(items)) + err = b.GetItem(&shepard.GetInfo{ + Count: uint64(len(items)), + }, itemChan) + + close(itemChan) + if err != nil { + t.Error("Failed to get items", err) + } + + i := 0 + for item := range itemChan { + if item.Ctime != items[i].Ctime { + t.Error("item ctimes", item.Ctime, items[i].Ctime) + } + + if item.Queue != items[i].Queue { + t.Error("item queues", item.Queue, items[i].Queue) + } + + if !bytes.Equal(item.Data, items[i].Data) { + t.Error("item data", item.Data, items[i].Data) + } + + if !reflect.DeepEqual(item.Stats, items[i].Stats) { + t.Error("item stats", item.Stats, items[i].Stats) + } + + i++ + } + + if i != len(items) { + t.Error("got", i, "items expected", len(items)) + } +} + +func TestBadgerDelItem(t *testing.T) { + opts := &Options{ + Dir: filepath.Join(os.TempDir(), "TestBadgerAddGetItem"), + } + + os.RemoveAll(opts.Dir) + defer os.RemoveAll(opts.Dir) + + b, err := NewBadger(opts) + if err != nil { + t.Fatal("Failed to start badger", err) + } + + items := make([]*shepard.Item, 32) + for i := range items { + items[i] = &shepard.Item{ + Ctime: time.Now().Unix(), + Data: make([]byte, 256*(i+1)), + Queue: "The queue", + Stats: map[string]int64{ + "lol": 10101010101, + "index": int64(i), + "datasize:": int64(256 * (i + 1)), + }, + } + rand.Read(items[i].Data) + + err = b.AddItem(items[i]) + if err != nil { + t.Error("Failed to add item", i, err) + } + } + + delinfo := &shepard.DelInfo{} + for i := range items { + if i%2 == 0 { + continue + } + + delinfo.Ids = append(delinfo.Ids, uint64(i)) + } + + err = b.DelItem(delinfo) + if err != nil { + t.Error("Failed to delete", delinfo.Ids, err) + } + + getinfo := &shepard.GetInfo{ + Count: uint64(len(items) - len(delinfo.Ids)), + } + getChan := make(chan *shepard.Item, getinfo.Count) + + err = b.GetItem(getinfo, getChan) + if err != nil { + t.Error("Failed to get items", err) + } + + close(getChan) + + i := 1 + for item := range getChan { + if item.Ctime != items[i].Ctime { + t.Error("item ctimes", item.Ctime, items[i].Ctime) + } + + if item.Queue != items[i].Queue { + t.Error("item queues", item.Queue, items[i].Queue) + } + + if !bytes.Equal(item.Data, items[i].Data) { + t.Error("item data", item.Data, items[i].Data) + } + + if !reflect.DeepEqual(item.Stats, items[i].Stats) { + t.Error("item stats", item.Stats, items[i].Stats) + } + i += 2 + } + + if i != len(items)+1 { + t.Error("only looped to item index", i) + } +} diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..706adee --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,70 @@ +package queue + +import ( + "encoding/binary" + "fmt" + "os" + "sync" + + "github.com/dgraph-io/badger/badger" + "github.com/tblyler/sheepmq/shepard" +) + +const ( + idByteSize = 8 + defaultFilePerm = os.FileMode(0700) +) + +// Queue defines a resource to store queue items +type Queue interface { + AddItem(*shepard.Item) error + GetItem(*shepard.GetInfo) (*shepard.Item, error) + DelItem(*shepard.DelInfo) error +} + +// Options to be used when creating a new Queue +type Options struct { + // Directory to store queue data + Dir string + + // Badger queue specific options + BadgerOptions *badger.Options +} + +type idConverter struct { + pool sync.Pool +} + +func newIDConverter() *idConverter { + return &idConverter{ + pool: sync.Pool{ + New: func() interface{} { + return make([]byte, idByteSize) + }, + }, + } +} + +func (i *idConverter) idToByte(id uint64) []byte { + buf := i.pool.Get().([]byte) + + binary.LittleEndian.PutUint64(buf, id) + + return buf +} + +func (i *idConverter) put(data []byte) { + i.pool.Put(data) +} + +func byteToID(data []byte) (uint64, error) { + if len(data) < idByteSize { + return 0, fmt.Errorf( + "unable to convert byte slice length of %d, need at least %d", + len(data), + idByteSize, + ) + } + + return binary.LittleEndian.Uint64(data), nil +} diff --git a/queue/queue_test.go b/queue/queue_test.go new file mode 100644 index 0000000..532ca44 --- /dev/null +++ b/queue/queue_test.go @@ -0,0 +1,67 @@ +package queue + +import ( + "bytes" + "math" + "reflect" + "testing" +) + +func TestIDConverter(t *testing.T) { + idconv := newIDConverter() + + idByte := idconv.idToByte(math.MaxUint64) + + // reserved keywords become english mistakes + for _, bite := range idByte { + if bite != byte(255) { + t.Error("maxuint64 is not all max bytes", idByte) + break + } + } + + idconv.put(idByte) + + firstAddress := reflect.ValueOf(idByte).Pointer() + + idByte = idconv.idToByte(0) + secondAddress := reflect.ValueOf(idByte).Pointer() + + if firstAddress != secondAddress { + t.Error("Failed to use byte pool") + } + + for _, bite := range idByte { + if bite != 0 { + t.Error("zero should be all zero bytes", idByte) + break + } + } + + idconv.put(idByte) + + id := uint64(582348138342) + idByte = idconv.idToByte(id) + knownByte := []byte{ + 102, 103, 167, 150, 135, 0, 0, 0, + } + + if !bytes.Equal(idByte, knownByte) { + t.Error("Failed to encode id exepect", knownByte, "got", idByte) + } + + idconv.put(idByte) + + newID, err := byteToID(knownByte) + if err != nil { + t.Error("error converting byte to id", err) + } + if newID != id { + t.Error("expected id", id, "got", newID) + } + + _, err = byteToID([]byte{1, 2, 3, 4, 5}) + if err == nil { + t.Error("Failed to get error for bad byte to id data") + } +} diff --git a/sheepmq/grpc_server.go b/sheepmq/grpc_server.go new file mode 100644 index 0000000..8319902 --- /dev/null +++ b/sheepmq/grpc_server.go @@ -0,0 +1,74 @@ +package sheepmq + +import ( + "io" + + "github.com/tblyler/sheepmq/shepard" + + context "golang.org/x/net/context" +) + +// GServer encapsulates all sheepmq GRPC server activity +type GServer struct { + sheepmq *SheepMQ +} + +// NewGServer creates a new sheepmq GRPC server instance +func NewGServer(sheepmq *SheepMQ) *GServer { + return &GServer{ + sheepmq: sheepmq, + } +} + +// AddItem to sheepmq's queue +func (l *GServer) AddItem(stream shepard.Sheepmq_AddItemServer) error { + for { + item, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + resp, _ := l.sheepmq.AddItem(item) + err = stream.Send(resp) + if err != nil { + return err + } + + if !resp.GetSuccess() { + return nil + } + } +} + +// GetItem from sheepmq's queue +func (l *GServer) GetItem(info *shepard.GetInfo, stream shepard.Sheepmq_GetItemServer) error { + items := make(chan *shepard.Item, 32) + + var err error + go func() { + err = l.sheepmq.GetItems(info, items) + close(items) + }() + + for item := range items { + err := stream.Send(item) + if err != nil { + return err + } + } + + return err +} + +// DelItem from sheepmq's queue +func (l *GServer) DelItem(ctx context.Context, info *shepard.DelInfo) (*shepard.Response, error) { + return l.sheepmq.DelItem(info) +} + +// ErrItem from sheepmq's queue +func (l *GServer) ErrItem(ctx context.Context, info *shepard.ErrInfo) (*shepard.Response, error) { + return l.sheepmq.ErrItem(info) +} diff --git a/sheepmq/sheepmq.go b/sheepmq/sheepmq.go new file mode 100644 index 0000000..7a04eeb --- /dev/null +++ b/sheepmq/sheepmq.go @@ -0,0 +1,38 @@ +package sheepmq + +import ( + "github.com/tblyler/sheepmq/queue" + "github.com/tblyler/sheepmq/shepard" +) + +// SheepMQ encapsulates the sheepmq queue logic +type SheepMQ struct { + queues map[string]*queue.Queue +} + +// NewSheepMQ creates a new sheepmq instance with the given configuration +func NewSheepMQ() (*SheepMQ, error) { + return &SheepMQ{ + queues: make(map[string]*queue.Queue), + }, nil +} + +// AddItem to the sheepmq queue +func (l *SheepMQ) AddItem(item *shepard.Item) (*shepard.Response, error) { + return nil, nil +} + +// GetItems from sheepmq's queue +func (l *SheepMQ) GetItems(info *shepard.GetInfo, items chan<- *shepard.Item) error { + return nil +} + +// DelItem from sheepmq's queue +func (l *SheepMQ) DelItem(info *shepard.DelInfo) (*shepard.Response, error) { + return nil, nil +} + +// ErrItem from sheepmq's queue +func (l *SheepMQ) ErrItem(info *shepard.ErrInfo) (*shepard.Response, error) { + return nil, nil +} diff --git a/shepard/shepard.pb.go b/shepard/shepard.pb.go new file mode 100644 index 0000000..9c26997 --- /dev/null +++ b/shepard/shepard.pb.go @@ -0,0 +1,607 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: shepard.proto + +/* +Package shepard is a generated protocol buffer package. + +It is generated from these files: + shepard.proto + +It has these top-level messages: + Response + Item + GetInfo + TimeLease + PidLease + HeartbeatLease + DelInfo + ErrInfo +*/ +package shepard + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Response struct { + // whether or not the operation was successful + Success bool `protobuf:"varint,1,opt,name=success" json:"success,omitempty"` + // the amount of items affected + Count uint64 `protobuf:"varint,2,opt,name=count" json:"count,omitempty"` + // error message on failure + Msg string `protobuf:"bytes,3,opt,name=msg" json:"msg,omitempty"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Response) GetSuccess() bool { + if m != nil { + return m.Success + } + return false +} + +func (m *Response) GetCount() uint64 { + if m != nil { + return m.Count + } + return 0 +} + +func (m *Response) GetMsg() string { + if m != nil { + return m.Msg + } + return "" +} + +type Item struct { + // the id for this item + Id uint64 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"` + // the arbitrary data for this item + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + // the queue for this item + Queue string `protobuf:"bytes,3,opt,name=queue" json:"queue,omitempty"` + // error queue to cycle items to + ErrorQueue string `protobuf:"bytes,4,opt,name=errorQueue" json:"errorQueue,omitempty"` + // error queue TTL in nanoseconds + ErrorTTL int64 `protobuf:"fixed64,5,opt,name=errorTTL" json:"errorTTL,omitempty"` + // the Unix time (in seconds) in which this item was created + Ctime int64 `protobuf:"fixed64,6,opt,name=ctime" json:"ctime,omitempty"` + // the Unix time (in seconds) in which this item was errored last + Etime int64 `protobuf:"fixed64,7,opt,name=etime" json:"etime,omitempty"` + // the amount of times this item was errored + Ecount uint32 `protobuf:"varint,8,opt,name=ecount" json:"ecount,omitempty"` + // arbitrary statistical sizes to record for this item + Stats map[string]int64 `protobuf:"bytes,9,rep,name=stats" json:"stats,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` +} + +func (m *Item) Reset() { *m = Item{} } +func (m *Item) String() string { return proto.CompactTextString(m) } +func (*Item) ProtoMessage() {} +func (*Item) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *Item) GetId() uint64 { + if m != nil { + return m.Id + } + return 0 +} + +func (m *Item) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *Item) GetQueue() string { + if m != nil { + return m.Queue + } + return "" +} + +func (m *Item) GetErrorQueue() string { + if m != nil { + return m.ErrorQueue + } + return "" +} + +func (m *Item) GetErrorTTL() int64 { + if m != nil { + return m.ErrorTTL + } + return 0 +} + +func (m *Item) GetCtime() int64 { + if m != nil { + return m.Ctime + } + return 0 +} + +func (m *Item) GetEtime() int64 { + if m != nil { + return m.Etime + } + return 0 +} + +func (m *Item) GetEcount() uint32 { + if m != nil { + return m.Ecount + } + return 0 +} + +func (m *Item) GetStats() map[string]int64 { + if m != nil { + return m.Stats + } + return nil +} + +type GetInfo struct { + Queue string `protobuf:"bytes,1,opt,name=queue" json:"queue,omitempty"` + // the amount of items to try and pull + Count uint64 `protobuf:"varint,2,opt,name=count" json:"count,omitempty"` + TimeoutLease *TimeLease `protobuf:"bytes,3,opt,name=timeoutLease" json:"timeoutLease,omitempty"` + PidLease *PidLease `protobuf:"bytes,4,opt,name=pidLease" json:"pidLease,omitempty"` + HeartLease *HeartbeatLease `protobuf:"bytes,5,opt,name=heartLease" json:"heartLease,omitempty"` +} + +func (m *GetInfo) Reset() { *m = GetInfo{} } +func (m *GetInfo) String() string { return proto.CompactTextString(m) } +func (*GetInfo) ProtoMessage() {} +func (*GetInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *GetInfo) GetQueue() string { + if m != nil { + return m.Queue + } + return "" +} + +func (m *GetInfo) GetCount() uint64 { + if m != nil { + return m.Count + } + return 0 +} + +func (m *GetInfo) GetTimeoutLease() *TimeLease { + if m != nil { + return m.TimeoutLease + } + return nil +} + +func (m *GetInfo) GetPidLease() *PidLease { + if m != nil { + return m.PidLease + } + return nil +} + +func (m *GetInfo) GetHeartLease() *HeartbeatLease { + if m != nil { + return m.HeartLease + } + return nil +} + +type TimeLease struct { + // TTL in nanoseconds to hold the lease + Ttl int64 `protobuf:"fixed64,1,opt,name=ttl" json:"ttl,omitempty"` +} + +func (m *TimeLease) Reset() { *m = TimeLease{} } +func (m *TimeLease) String() string { return proto.CompactTextString(m) } +func (*TimeLease) ProtoMessage() {} +func (*TimeLease) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *TimeLease) GetTtl() int64 { + if m != nil { + return m.Ttl + } + return 0 +} + +type PidLease struct { + Pid uint32 `protobuf:"varint,1,opt,name=pid" json:"pid,omitempty"` +} + +func (m *PidLease) Reset() { *m = PidLease{} } +func (m *PidLease) String() string { return proto.CompactTextString(m) } +func (*PidLease) ProtoMessage() {} +func (*PidLease) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *PidLease) GetPid() uint32 { + if m != nil { + return m.Pid + } + return 0 +} + +type HeartbeatLease struct { + Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Ttl int64 `protobuf:"fixed64,2,opt,name=ttl" json:"ttl,omitempty"` +} + +func (m *HeartbeatLease) Reset() { *m = HeartbeatLease{} } +func (m *HeartbeatLease) String() string { return proto.CompactTextString(m) } +func (*HeartbeatLease) ProtoMessage() {} +func (*HeartbeatLease) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *HeartbeatLease) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *HeartbeatLease) GetTtl() int64 { + if m != nil { + return m.Ttl + } + return 0 +} + +type DelInfo struct { + Queue string `protobuf:"bytes,1,opt,name=queue" json:"queue,omitempty"` + Ids []uint64 `protobuf:"varint,2,rep,packed,name=ids" json:"ids,omitempty"` +} + +func (m *DelInfo) Reset() { *m = DelInfo{} } +func (m *DelInfo) String() string { return proto.CompactTextString(m) } +func (*DelInfo) ProtoMessage() {} +func (*DelInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func (m *DelInfo) GetQueue() string { + if m != nil { + return m.Queue + } + return "" +} + +func (m *DelInfo) GetIds() []uint64 { + if m != nil { + return m.Ids + } + return nil +} + +type ErrInfo struct { + Queue string `protobuf:"bytes,1,opt,name=queue" json:"queue,omitempty"` + Ids []uint64 `protobuf:"varint,2,rep,packed,name=ids" json:"ids,omitempty"` +} + +func (m *ErrInfo) Reset() { *m = ErrInfo{} } +func (m *ErrInfo) String() string { return proto.CompactTextString(m) } +func (*ErrInfo) ProtoMessage() {} +func (*ErrInfo) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } + +func (m *ErrInfo) GetQueue() string { + if m != nil { + return m.Queue + } + return "" +} + +func (m *ErrInfo) GetIds() []uint64 { + if m != nil { + return m.Ids + } + return nil +} + +func init() { + proto.RegisterType((*Response)(nil), "shepard.Response") + proto.RegisterType((*Item)(nil), "shepard.Item") + proto.RegisterType((*GetInfo)(nil), "shepard.GetInfo") + proto.RegisterType((*TimeLease)(nil), "shepard.TimeLease") + proto.RegisterType((*PidLease)(nil), "shepard.PidLease") + proto.RegisterType((*HeartbeatLease)(nil), "shepard.HeartbeatLease") + proto.RegisterType((*DelInfo)(nil), "shepard.DelInfo") + proto.RegisterType((*ErrInfo)(nil), "shepard.ErrInfo") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Sheepmq service + +type SheepmqClient interface { + // Add the given items to the queue + AddItem(ctx context.Context, opts ...grpc.CallOption) (Sheepmq_AddItemClient, error) + // Get the given items per the info provided + GetItem(ctx context.Context, in *GetInfo, opts ...grpc.CallOption) (Sheepmq_GetItemClient, error) + // Delete the given items if possible + DelItem(ctx context.Context, in *DelInfo, opts ...grpc.CallOption) (*Response, error) + // Error the given items if possible + ErrItem(ctx context.Context, in *ErrInfo, opts ...grpc.CallOption) (*Response, error) +} + +type sheepmqClient struct { + cc *grpc.ClientConn +} + +func NewSheepmqClient(cc *grpc.ClientConn) SheepmqClient { + return &sheepmqClient{cc} +} + +func (c *sheepmqClient) AddItem(ctx context.Context, opts ...grpc.CallOption) (Sheepmq_AddItemClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Sheepmq_serviceDesc.Streams[0], c.cc, "/shepard.sheepmq/AddItem", opts...) + if err != nil { + return nil, err + } + x := &sheepmqAddItemClient{stream} + return x, nil +} + +type Sheepmq_AddItemClient interface { + Send(*Item) error + Recv() (*Response, error) + grpc.ClientStream +} + +type sheepmqAddItemClient struct { + grpc.ClientStream +} + +func (x *sheepmqAddItemClient) Send(m *Item) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sheepmqAddItemClient) Recv() (*Response, error) { + m := new(Response) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *sheepmqClient) GetItem(ctx context.Context, in *GetInfo, opts ...grpc.CallOption) (Sheepmq_GetItemClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Sheepmq_serviceDesc.Streams[1], c.cc, "/shepard.sheepmq/GetItem", opts...) + if err != nil { + return nil, err + } + x := &sheepmqGetItemClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Sheepmq_GetItemClient interface { + Recv() (*Item, error) + grpc.ClientStream +} + +type sheepmqGetItemClient struct { + grpc.ClientStream +} + +func (x *sheepmqGetItemClient) Recv() (*Item, error) { + m := new(Item) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *sheepmqClient) DelItem(ctx context.Context, in *DelInfo, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := grpc.Invoke(ctx, "/shepard.sheepmq/DelItem", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *sheepmqClient) ErrItem(ctx context.Context, in *ErrInfo, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := grpc.Invoke(ctx, "/shepard.sheepmq/ErrItem", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Sheepmq service + +type SheepmqServer interface { + // Add the given items to the queue + AddItem(Sheepmq_AddItemServer) error + // Get the given items per the info provided + GetItem(*GetInfo, Sheepmq_GetItemServer) error + // Delete the given items if possible + DelItem(context.Context, *DelInfo) (*Response, error) + // Error the given items if possible + ErrItem(context.Context, *ErrInfo) (*Response, error) +} + +func RegisterSheepmqServer(s *grpc.Server, srv SheepmqServer) { + s.RegisterService(&_Sheepmq_serviceDesc, srv) +} + +func _Sheepmq_AddItem_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SheepmqServer).AddItem(&sheepmqAddItemServer{stream}) +} + +type Sheepmq_AddItemServer interface { + Send(*Response) error + Recv() (*Item, error) + grpc.ServerStream +} + +type sheepmqAddItemServer struct { + grpc.ServerStream +} + +func (x *sheepmqAddItemServer) Send(m *Response) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sheepmqAddItemServer) Recv() (*Item, error) { + m := new(Item) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Sheepmq_GetItem_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetInfo) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SheepmqServer).GetItem(m, &sheepmqGetItemServer{stream}) +} + +type Sheepmq_GetItemServer interface { + Send(*Item) error + grpc.ServerStream +} + +type sheepmqGetItemServer struct { + grpc.ServerStream +} + +func (x *sheepmqGetItemServer) Send(m *Item) error { + return x.ServerStream.SendMsg(m) +} + +func _Sheepmq_DelItem_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DelInfo) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SheepmqServer).DelItem(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/shepard.sheepmq/DelItem", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SheepmqServer).DelItem(ctx, req.(*DelInfo)) + } + return interceptor(ctx, in, info, handler) +} + +func _Sheepmq_ErrItem_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ErrInfo) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SheepmqServer).ErrItem(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/shepard.sheepmq/ErrItem", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SheepmqServer).ErrItem(ctx, req.(*ErrInfo)) + } + return interceptor(ctx, in, info, handler) +} + +var _Sheepmq_serviceDesc = grpc.ServiceDesc{ + ServiceName: "shepard.sheepmq", + HandlerType: (*SheepmqServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "DelItem", + Handler: _Sheepmq_DelItem_Handler, + }, + { + MethodName: "ErrItem", + Handler: _Sheepmq_ErrItem_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "AddItem", + Handler: _Sheepmq_AddItem_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "GetItem", + Handler: _Sheepmq_GetItem_Handler, + ServerStreams: true, + }, + }, + Metadata: "shepard.proto", +} + +func init() { proto.RegisterFile("shepard.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 518 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xc1, 0x6e, 0xd3, 0x40, + 0x10, 0xed, 0xda, 0x4e, 0xec, 0x4c, 0x9a, 0xca, 0xac, 0x10, 0x58, 0x11, 0x20, 0xcb, 0x27, 0x5f, + 0x88, 0x42, 0x90, 0x68, 0xc5, 0x0d, 0x89, 0x0a, 0x1a, 0xe5, 0x00, 0x4b, 0x7e, 0xc0, 0x8d, 0x07, + 0x6a, 0x11, 0xc7, 0xae, 0x77, 0x8d, 0xd4, 0xbf, 0xe2, 0x53, 0xb8, 0xf3, 0x33, 0x68, 0x76, 0x37, + 0x4e, 0x2c, 0x45, 0x48, 0xbd, 0xcd, 0x9b, 0x99, 0x37, 0xe3, 0xf7, 0x32, 0x1b, 0x98, 0xc8, 0x3b, + 0xac, 0xb3, 0x26, 0x9f, 0xd5, 0x4d, 0xa5, 0x2a, 0xee, 0x5b, 0x98, 0xac, 0x20, 0x10, 0x28, 0xeb, + 0x6a, 0x27, 0x91, 0x47, 0xe0, 0xcb, 0x76, 0xb3, 0x41, 0x29, 0x23, 0x16, 0xb3, 0x34, 0x10, 0x7b, + 0xc8, 0x9f, 0xc2, 0x60, 0x53, 0xb5, 0x3b, 0x15, 0x39, 0x31, 0x4b, 0x3d, 0x61, 0x00, 0x0f, 0xc1, + 0x2d, 0xe5, 0x8f, 0xc8, 0x8d, 0x59, 0x3a, 0x12, 0x14, 0x26, 0xbf, 0x1d, 0xf0, 0x6e, 0x14, 0x96, + 0xfc, 0x02, 0x9c, 0x22, 0xd7, 0x53, 0x3c, 0xe1, 0x14, 0x39, 0xe7, 0xe0, 0xe5, 0x99, 0xca, 0x34, + 0xff, 0x5c, 0xe8, 0x98, 0x86, 0xde, 0xb7, 0xd8, 0xa2, 0x1d, 0x60, 0x00, 0x7f, 0x05, 0x80, 0x4d, + 0x53, 0x35, 0x5f, 0x75, 0xc9, 0xd3, 0xa5, 0xa3, 0x0c, 0x9f, 0x42, 0xa0, 0xd1, 0x7a, 0xbd, 0x8a, + 0x06, 0x31, 0x4b, 0x43, 0xd1, 0x61, 0xfd, 0x99, 0xaa, 0x28, 0x31, 0x1a, 0xea, 0x82, 0x01, 0x94, + 0x45, 0x9d, 0xf5, 0x4d, 0x56, 0x03, 0xfe, 0x0c, 0x86, 0x68, 0x34, 0x05, 0x31, 0x4b, 0x27, 0xc2, + 0x22, 0x3e, 0x83, 0x81, 0x54, 0x99, 0x92, 0xd1, 0x28, 0x76, 0xd3, 0xf1, 0x22, 0x9a, 0xed, 0x8d, + 0x23, 0x5d, 0xb3, 0x6f, 0x54, 0xba, 0xde, 0xa9, 0xe6, 0x41, 0x98, 0xb6, 0xe9, 0x15, 0xc0, 0x21, + 0x49, 0x96, 0xfc, 0xc4, 0x07, 0x2d, 0x7c, 0x24, 0x28, 0xa4, 0xed, 0xbf, 0xb2, 0x6d, 0x8b, 0x5a, + 0xba, 0x2b, 0x0c, 0x78, 0xef, 0x5c, 0xb1, 0xa5, 0x17, 0x40, 0x18, 0x26, 0x7f, 0x19, 0xf8, 0x9f, + 0x50, 0xdd, 0xec, 0xbe, 0x57, 0x07, 0x47, 0xd8, 0xb1, 0x23, 0xa7, 0xcd, 0x7f, 0x07, 0xe7, 0xa4, + 0xa3, 0x6a, 0xd5, 0x0a, 0x33, 0x69, 0x4c, 0x1c, 0x2f, 0x78, 0xf7, 0xb9, 0xeb, 0xa2, 0x44, 0x5d, + 0x11, 0xbd, 0x3e, 0xfe, 0x1a, 0x82, 0xba, 0xc8, 0x0d, 0xc7, 0xd3, 0x9c, 0x27, 0x1d, 0xe7, 0x8b, + 0x2d, 0x88, 0xae, 0x85, 0x5f, 0x02, 0xdc, 0x61, 0xd6, 0xd8, 0x25, 0x03, 0x4d, 0x78, 0xde, 0x11, + 0x3e, 0x53, 0xe9, 0x16, 0x33, 0x53, 0x16, 0x47, 0xad, 0x4b, 0x2f, 0x18, 0x86, 0xe3, 0xe4, 0x25, + 0x8c, 0xba, 0x0f, 0x21, 0x73, 0x94, 0xda, 0x6a, 0x71, 0xa1, 0xa0, 0x30, 0x79, 0x01, 0xc1, 0x7e, + 0x27, 0x55, 0x6b, 0x7b, 0x33, 0x13, 0x41, 0x61, 0xb2, 0x80, 0x8b, 0xfe, 0x82, 0xa3, 0xb3, 0x1a, + 0xe9, 0xb3, 0xb2, 0x13, 0x9d, 0xc3, 0xc4, 0x4b, 0xf0, 0x3f, 0xe2, 0xf6, 0x3f, 0x6e, 0x86, 0xe0, + 0x16, 0xb9, 0x8c, 0x9c, 0xd8, 0x4d, 0x3d, 0x41, 0xe1, 0xd2, 0x0b, 0xdc, 0x70, 0x4c, 0xc4, 0xeb, + 0xa6, 0x79, 0x3c, 0x71, 0xf1, 0x87, 0x01, 0xbd, 0x26, 0xac, 0xcb, 0x7b, 0xfe, 0x06, 0xfc, 0x0f, + 0x79, 0xae, 0x5f, 0xc0, 0xa4, 0x77, 0x38, 0xd3, 0x83, 0xc9, 0xfb, 0xe7, 0x96, 0x9c, 0xa5, 0x6c, + 0xce, 0xf8, 0xcc, 0xfc, 0xfc, 0x44, 0x09, 0xbb, 0x1e, 0x7b, 0x10, 0xd3, 0xfe, 0x90, 0xe4, 0x6c, + 0xce, 0xf8, 0xdc, 0x08, 0xec, 0xf7, 0x5b, 0xc9, 0x27, 0xb7, 0x10, 0x83, 0x94, 0xf5, 0x19, 0x56, + 0xeb, 0x49, 0xc6, 0xed, 0x50, 0xff, 0x49, 0xbc, 0xfd, 0x17, 0x00, 0x00, 0xff, 0xff, 0x85, 0xe5, + 0x0d, 0x44, 0x35, 0x04, 0x00, 0x00, +} diff --git a/shepard/shepard.proto b/shepard/shepard.proto new file mode 100644 index 0000000..82e1628 --- /dev/null +++ b/shepard/shepard.proto @@ -0,0 +1,108 @@ +syntax = "proto3"; + +package shepard; + +service sheepmq { + // Add the given items to the queue + rpc AddItem(stream Item) returns (stream Response) {} + + // Get the given items per the info provided + rpc GetItem(GetInfo) returns (stream Item) {} + + // Delete the given items if possible + rpc DelItem(DelInfo) returns (Response) {} + + // Error the given items if possible + rpc ErrItem(ErrInfo) returns (Response) {} +} + +message Response { + // whether or not the operation was successful + bool success = 1; + + // the amount of items affected + uint64 count = 2; + + // error message on failure + string msg = 3; +} + +message Item { + // the id for this item + uint64 id = 1; + + // the arbitrary data for this item + bytes data = 2; + + // the queue for this item + string queue = 3; + + // error queue to cycle items to + string errorQueue = 4; + + // error queue TTL in nanoseconds + sfixed64 errorTTL = 5; + + // the Unix time (in seconds) in which this item was created + sfixed64 ctime = 6; + + // the Unix time (in seconds) in which this item was errored last + sfixed64 etime = 7; + + // the amount of times this item was errored + uint32 ecount = 8; + + // arbitrary statistical sizes to record for this item + map stats = 9; + + // future proof? + reserved 10 to 15; +} + +message GetInfo { + string queue = 1; + + // the amount of items to try and pull + uint64 count = 2; + + TimeLease timeoutLease = 3; + + PidLease pidLease = 4; + + HeartbeatLease heartLease = 5; + + // future proof? + reserved 6 to 10; +} + +message TimeLease { + // TTL in nanoseconds to hold the lease + sfixed64 ttl = 1; +} + +message PidLease { + uint32 pid = 1; +} + +message HeartbeatLease { + string id = 1; + sfixed64 ttl = 2; +} + +message DelInfo { + string queue = 1; + + repeated uint64 ids = 2; + + // future proof? + reserved 3 to 10; +} + +message ErrInfo { + string queue = 1; + + repeated uint64 ids = 2; + + // future proof? + reserved 3 to 10; +} diff --git a/update_grpc.sh b/update_grpc.sh new file mode 100755 index 0000000..279c185 --- /dev/null +++ b/update_grpc.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -xe + +# ensure old generated files are removed +find "$(dirname ${0})" -type f -name '*.pb.go' -exec rm -f {} + + +# update grpc files +protoc -I shepard --go_out=plugins=grpc:shepard shepard/*.proto