/* * Copyright 2018 Dgraph Labs, Inc. and Contributors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package badger import ( "bytes" "context" "math" "sync" "sync/atomic" "time" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" humanize "github.com/dustin/go-humanize" "github.com/golang/protobuf/proto" ) const pageSize = 4 << 20 // 4MB // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted // order, use Iterator. type Stream struct { // Prefix to only iterate over certain range of keys. If set to nil (default), Stream would // iterate over the entire DB. Prefix []byte // Number of goroutines to use for iterating over key ranges. Defaults to 16. NumGo int // Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can // be used to help differentiate them from other activities. Default is "Badger.Stream". LogPrefix string // ChooseKey is invoked each time a new key is encountered. Note that this is not called // on every version of the value, only the first encountered version (i.e. the highest version // of the value a key has). ChooseKey can be left nil to select all keys. // // Note: Calls to ChooseKey are concurrent. ChooseKey func(item *Item) bool // KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It // is upto the caller to iterate over the versions and generate zero, one or more KVs. It // is expected that the user would advance the iterator to go through the versions of the // values. However, the user MUST immediately return from this function on the first encounter // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList // function by default. // // Note: Calls to KeyToList are concurrent. KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error) // This is the method where Stream sends the final output. All calls to Send are done by a // single goroutine, i.e. logic within Send method can expect single threaded execution. Send func(*pb.KVList) error readTs uint64 db *DB rangeCh chan keyRange kvChan chan *pb.KVList nextStreamId uint32 } // ToList is a default implementation of KeyToList. It picks up all valid versions of the key, // skipping over deleted or expired keys. func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { list := &pb.KVList{} for ; itr.Valid(); itr.Next() { item := itr.Item() if item.IsDeletedOrExpired() { break } if !bytes.Equal(key, item.Key()) { // Break out on the first encounter with another key. break } valCopy, err := item.ValueCopy(nil) if err != nil { return nil, err } kv := &pb.KV{ Key: item.KeyCopy(nil), Value: valCopy, UserMeta: []byte{item.UserMeta()}, Version: item.Version(), ExpiresAt: item.ExpiresAt(), } list.Kv = append(list.Kv, kv) if st.db.opt.NumVersionsToKeep == 1 { break } if item.DiscardEarlierVersions() { break } } return list, nil } // keyRange is [start, end), including start, excluding end. Do ensure that the start, // end byte slices are owned by keyRange struct. func (st *Stream) produceRanges(ctx context.Context) { splits := st.db.KeySplits(st.Prefix) // We don't need to create more key ranges than NumGo goroutines. This way, we will have limited // number of "streams" coming out, which then helps limit the memory used by SSWriter. { pickEvery := int(math.Floor(float64(len(splits)) / float64(st.NumGo))) if pickEvery < 1 { pickEvery = 1 } filtered := splits[:0] for i, split := range splits { if (i+1)%pickEvery == 0 { filtered = append(filtered, split) } } splits = filtered } start := y.SafeCopy(nil, st.Prefix) for _, key := range splits { st.rangeCh <- keyRange{left: start, right: y.SafeCopy(nil, []byte(key))} start = y.SafeCopy(nil, []byte(key)) } // Edge case: prefix is empty and no splits exist. In that case, we should have at least one // keyRange output. st.rangeCh <- keyRange{left: start} close(st.rangeCh) } // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. func (st *Stream) produceKVs(ctx context.Context) error { var size int var txn *Txn if st.readTs > 0 { txn = st.db.NewTransactionAt(st.readTs, false) } else { txn = st.db.NewTransaction(false) } defer txn.Discard() iterate := func(kr keyRange) error { iterOpts := DefaultIteratorOptions iterOpts.AllVersions = true iterOpts.Prefix = st.Prefix iterOpts.PrefetchValues = false itr := txn.NewIterator(iterOpts) defer itr.Close() // This unique stream id is used to identify all the keys from this iteration. streamId := atomic.AddUint32(&st.nextStreamId, 1) outList := new(pb.KVList) var prevKey []byte for itr.Seek(kr.left); itr.Valid(); { // it.Valid would only return true for keys with the provided Prefix in iterOpts. item := itr.Item() if bytes.Equal(item.Key(), prevKey) { itr.Next() continue } prevKey = append(prevKey[:0], item.Key()...) // Check if we reached the end of the key range. if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 { break } // Check if we should pick this key. if st.ChooseKey != nil && !st.ChooseKey(item) { continue } // Now convert to key value. list, err := st.KeyToList(item.KeyCopy(nil), itr) if err != nil { return err } if list == nil || len(list.Kv) == 0 { continue } outList.Kv = append(outList.Kv, list.Kv...) size += proto.Size(list) if size >= pageSize { for _, kv := range outList.Kv { kv.StreamId = streamId } select { case st.kvChan <- outList: case <-ctx.Done(): return ctx.Err() } outList = new(pb.KVList) size = 0 } } if len(outList.Kv) > 0 { for _, kv := range outList.Kv { kv.StreamId = streamId } // TODO: Think of a way to indicate that a stream is over. select { case st.kvChan <- outList: case <-ctx.Done(): return ctx.Err() } } return nil } for { select { case kr, ok := <-st.rangeCh: if !ok { // Done with the keys. return nil } if err := iterate(kr); err != nil { return err } case <-ctx.Done(): return ctx.Err() } } } func (st *Stream) streamKVs(ctx context.Context) error { var count int var bytesSent uint64 t := time.NewTicker(time.Second) defer t.Stop() now := time.Now() slurp := func(batch *pb.KVList) error { loop: for { select { case kvs, ok := <-st.kvChan: if !ok { break loop } y.AssertTrue(kvs != nil) batch.Kv = append(batch.Kv, kvs.Kv...) default: break loop } } sz := uint64(proto.Size(batch)) bytesSent += sz count += len(batch.Kv) t := time.Now() if err := st.Send(batch); err != nil { return err } st.db.opt.Infof("%s Created batch of size: %s in %s.\n", st.LogPrefix, humanize.Bytes(sz), time.Since(t)) return nil } outer: for { var batch *pb.KVList select { case <-ctx.Done(): return ctx.Err() case <-t.C: dur := time.Since(now) durSec := uint64(dur.Seconds()) if durSec == 0 { continue } speed := bytesSent / durSec st.db.opt.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec\n", st.LogPrefix, y.FixedDuration(dur), humanize.Bytes(bytesSent), humanize.Bytes(speed)) case kvs, ok := <-st.kvChan: if !ok { break outer } y.AssertTrue(kvs != nil) batch = kvs if err := slurp(batch); err != nil { return err } } } st.db.opt.Infof("%s Sent %d keys\n", st.LogPrefix, count) return nil } // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also // spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. // kvChan should only have a small capacity to ensure that we don't buffer up too much data if // sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each // KVList. To get 128MB buffer, we can set the channel size to 32. st.kvChan = make(chan *pb.KVList, 32) if st.KeyToList == nil { st.KeyToList = st.ToList } // Picks up ranges from Badger, and sends them to rangeCh. go st.produceRanges(ctx) errCh := make(chan error, 1) // Stores error by consumeKeys. var wg sync.WaitGroup for i := 0; i < st.NumGo; i++ { wg.Add(1) go func() { defer wg.Done() // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan. if err := st.produceKVs(ctx); err != nil { select { case errCh <- err: default: } } }() } // Pick up key-values from kvChan and send to stream. kvErr := make(chan error, 1) go func() { // Picks up KV lists from kvChan, and sends them to Output. kvErr <- st.streamKVs(ctx) }() wg.Wait() // Wait for produceKVs to be over. close(st.kvChan) // Now we can close kvChan. select { case err := <-errCh: // Check error from produceKVs. return err default: } // Wait for key streaming to be over. err := <-kvErr return err } func (db *DB) newStream() *Stream { return &Stream{db: db, NumGo: 16, LogPrefix: "Badger.Stream"} } // NewStream creates a new Stream. func (db *DB) NewStream() *Stream { if db.opt.managedTxns { panic("This API can not be called in managed mode.") } return db.newStream() } // NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB. func (db *DB) NewStreamAt(readTs uint64) *Stream { if !db.opt.managedTxns { panic("This API can only be called in managed mode.") } stream := db.newStream() stream.readTs = readTs return stream }