Initial commit

This commit is contained in:
Tony Blyler 2017-06-30 17:40:36 -04:00
parent 20a94545c7
commit f50dc198ff
3 changed files with 131 additions and 0 deletions

2
README.md Normal file
View file

@ -0,0 +1,2 @@
# go-atomic
Useful custom atomic-safe [Go](https://golang.org/) structures.

58
workergroup.go Normal file
View file

@ -0,0 +1,58 @@
package workergroup
import (
"runtime"
"sync/atomic"
"time"
)
const waitTime = time.Nanosecond * 10
// WorkerGroup acts as a sync.WaitGroup but with a max worker count.
type WorkerGroup struct {
maxWorkers uint32
workers uint32
}
// MaxWorkers sets max workers to `workers` or returns current setting if < 0
func (wg *WorkerGroup) MaxWorkers(workers uint32) uint32 {
if workers == 0 {
// update max workers to GOMAXPROCS if set to 0
atomic.CompareAndSwapUint32(&wg.maxWorkers, 0, uint32(runtime.GOMAXPROCS(0)))
return atomic.LoadUint32(&wg.maxWorkers)
}
atomic.StoreUint32(&wg.maxWorkers, workers)
return workers
}
// Add delta to the worker count
func (wg *WorkerGroup) Add(delta uint32) {
// update max workers to GOMAXPROCES if set to 0
atomic.CompareAndSwapUint32(&wg.maxWorkers, 0, uint32(runtime.GOMAXPROCS(0)))
var oldCount uint32
for oldCount = atomic.LoadUint32(&wg.workers); oldCount+delta > atomic.LoadUint32(&wg.maxWorkers); oldCount = atomic.LoadUint32(&wg.workers) {
time.Sleep(waitTime)
}
if atomic.CompareAndSwapUint32(&wg.workers, oldCount, oldCount+delta) {
return
}
wg.Add(delta)
}
// Done decrement the worker counter
func (wg *WorkerGroup) Done() {
atomic.AddUint32(&wg.workers, ^uint32(0))
}
// Wait until worker count is zero
func (wg *WorkerGroup) Wait() {
for atomic.LoadUint32(&wg.workers) != 0 {
time.Sleep(waitTime)
}
}

71
workergroup_test.go Normal file
View file

@ -0,0 +1,71 @@
package workergroup
import (
"runtime"
"sync/atomic"
"testing"
"time"
)
func TestWorkerGroupMaxWorkers(t *testing.T) {
wg := WorkerGroup{}
if wg.MaxWorkers(0) != uint32(runtime.GOMAXPROCS(0)) {
t.Error("Default max workers should be GOMAXPROCS got", wg.MaxWorkers(0))
}
if wg.MaxWorkers(3) != 3 {
t.Error("Failed to set max workers to 3")
}
}
func TestWorkerGroupAdd(t *testing.T) {
wg := WorkerGroup{}
wg.MaxWorkers(3)
wg.Add(1)
wg.Add(1)
wg.Add(1)
waitChan := make(chan time.Time, 1)
start := time.Now()
go func() {
wg.Add(1)
waitChan <- time.Now()
}()
time.Sleep(time.Millisecond * 50)
wg.Done()
stop := <-waitChan
if time.Duration(stop.Sub(start).Nanoseconds()) < (time.Millisecond * 50) {
t.Error("wait channel should have waited at least 50ms, waited", stop.Sub(start))
}
}
func TestWorkerGroupWait(t *testing.T) {
wg := WorkerGroup{}
wg.MaxWorkers(3)
for i := 0; i < 32; i++ {
go func() {
wg.Add(1)
time.Sleep(time.Millisecond)
wg.Done()
}()
}
// for safety
time.Sleep(time.Millisecond)
wg.Wait()
time.Sleep(time.Millisecond)
if atomic.LoadUint32(&wg.workers) != 0 {
t.Error("Failed to wait until workers was actually 0")
}
}