Concurrency issues in memory store
Closed this issue · 5 comments
Noticed when reviewing the code in store_memory.go
that it retrieved the cache item checked if it expired and then attempted to increment it but there was no mutex or anything to protect that.
So it's possible that we check that it hasn't expired but by the time we IncrementInt64
it has expired and is not in the cache resulting in an error:
item, found := s.Cache.Items()[key]
ms := int64(time.Millisecond)
now := time.Now()
// ** TIME A: HAS NOT EXPIRED YET
if !found || item.Expired() {
s.Cache.Set(key, int64(1), rate.Period)
return Context{
Limit: rate.Limit,
Remaining: rate.Limit - 1,
Reset: (now.UnixNano()/ms + int64(rate.Period)/ms) / 1000,
Reached: false,
}, nil
}
// ** TIME B: key has expired so this will return an error
count, err := s.Cache.IncrementInt64(key, 1)
if err != nil {
return ctx, err
}
This is a little bit of an edge case, but it does result in unexpected behavior in a high concurrency environment.
Here's a unit test spawning a bunch of goroutines resulting in an error:
func TestConcurrency(t *testing.T) {
rate := Rate{Period: time.Nanosecond * 10, Limit: 100000}
store := NewMemoryStoreWithOptions(StoreOptions{
Prefix: "limitertests:memory",
CleanUpInterval: 1 * time.Nanosecond,
})
wg := sync.WaitGroup{}
limiter := NewLimiter(store, rate)
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
for j := 0; j < 10000; j++ {
_, err := limiter.Get("boo2")
assert.NoError(t, err)
}
wg.Done()
}(i)
}
wg.Wait()
}
Error Trace: 2:
Error: No error is expected but got Item limitertests:memory:boo2 not found
Error Trace: 2:
Error: No error is expected but got Item limitertests:memory:boo2 not found
Not sure what the fix should be here, maybe just have the memory store fall back on creating a new item?
Also noticed this but it sounds like it's related to store_redis.go
(maybe it was old data from a previous run?)
=== RUN TestGJRMiddlewareWithRaceCondition
--- FAIL: TestGJRMiddlewareWithRaceCondition (0.03s)
Error Trace: middleware_gjr_test.go:86
Error: Not equal: 5 (expected)
!= 4 (actual)
=== RUN TestHTTPMiddleware
hmm also I'm occasionally (like 10%) of the times seeing this panic:
=== RUN TestConcurrency
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x5e304]
goroutine 555 [running]:
github.com/ulule/limiter.(*MemoryStore).Get(0xc82000bba0, 0xc820866920, 0x18, 0x0, 0x0, 0xa, 0x186a0, 0x0, 0x0, 0x0, ...)
/Users/ddaniels/dev/src/github.com/ulule/limiter/store_memory.go:35 +0x24c
github.com/ulule/limiter.(*Limiter).Get(0xc820017b60, 0x4742b8, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/Users/ddaniels/dev/src/github.com/ulule/limiter/limiter.go:35 +0x9f
github.com/ulule/limiter.TestConcurrency.func1(0xc820017b60, 0xc820020750, 0xc8200be4f0, 0x210)
/Users/ddaniels/dev/src/github.com/ulule/limiter/limiter_test.go:96 +0x50
created by github.com/ulule/limiter.TestConcurrency
/Users/ddaniels/dev/src/github.com/ulule/limiter/limiter_test.go:100 +0x200
goroutine 1 [chan receive]:
testing.RunTests(0x52c3a0, 0x681860, 0x9, 0x9, 0x1)
/Users/ddaniels/.gvm/gos/go1.5.2/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc820473f08, 0xc82000a560)
/Users/ddaniels/.gvm/gos/go1.5.2/src/testing/testing.go:494 +0x70
main.main()
github.com/ulule/limiter/_test/_testmain.go:70 +0x116
goroutine 17 [syscall, locked to thread]:
runtime.goexit()
/Users/ddaniels/.gvm/gos/go1.5.2/src/runtime/asm_amd64.s:1721 +0x1
goroutine 8 [semacquire]:
sync.runtime_Semacquire(0xc8200be4fc)
/Users/ddaniels/.gvm/gos/go1.5.2/src/runtime/sema.go:43 +0x26
sync.(*WaitGroup).Wait(0xc8200be4f0)
/Users/ddaniels/.gvm/gos/go1.5.2/src/sync/waitgroup.go:126 +0xb4
github.com/ulule/limiter.TestConcurrency(0xc820020750)
/Users/ddaniels/dev/src/github.com/ulule/limiter/limiter_test.go:102 +0x223
testing.tRunner(0xc820020750, 0x681890)
/Users/ddaniels/.gvm/gos/go1.5.2/src/testing/testing.go:456 +0x98
created by testing.RunTests
/Users/ddaniels/.gvm/gos/go1.5.2/src/testing/testing.go:561 +0x86d
goroutine 9 [select]:
github.com/patrickmn/go-cache.(*janitor).Run(0xc8200be4e0, 0xc820013e40)
/Users/ddaniels/dev/src/github.com/patrickmn/go-cache/cache.go:1037 +0x15b
created by github.com/patrickmn/go-cache.runJanitor
/Users/ddaniels/dev/src/github.com/patrickmn/go-cache/cache.go:1056 +0x81
goroutine 10 [semacquire]:
sync.runtime_Semacquire(0xc820013e54)
/Users/ddaniels/.gvm/gos/go1.5.2/src/runtime/sema.go:43 +0x26
sync.(*Mutex).Lock(0xc820013e50)
/Users/ddaniels/.gvm/gos/go1.5.2/src/sync/mutex.go:82 +0x1c4
sync.(*RWMutex).Lock(0xc820013e50)
/Users/ddaniels/.gvm/gos/go1.5.2/src/sync/rwmutex.go:82 +0x30
github.com/patrickmn/go-cache.(*cache).Set(0xc820013e40, 0xc8207d04c0, 0x18, 0x3455a0, 0xc820add060, 0xa)
/Users/ddaniels/dev/src/github.com/patrickmn/go-cache/cache.go:60 +0x154
github.com/ulule/limiter.(*MemoryStore).newItem(0xc82000bba0, 0xece4cf24e, 0x143188d5133cdabc, 0x684f80, 0xc8207d04c0, 0x18, 0x0, 0x0, 0xa, 0x186a0, ...)
/Users/ddaniels/dev/src/github.com/ulule/limiter/store_memory.go:63 +0xb1
github.com/ulule/limiter.(*MemoryStore).Get(0xc82000bba0, 0xc8207d04c0, 0x18, 0x0, 0x0, 0xa, 0x186a0, 0x0, 0x0, 0x0, ...)
/Users/ddaniels/dev/src/github.com/ulule/limiter/store_memory.go:39 +0x756
github.com/ulule/limiter.(*Limiter).Get(0xc820017b60, 0x4742b8, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/Users/ddaniels/dev/src/github.com/ulule/limiter/limiter.go:35 +0x9f
github.com/ulule/limiter.TestConcurrency.func1(0xc820017b60, 0xc820020750, 0xc8200be4f0, 0x0)
/Users/ddaniels/dev/src/github.com/ulule/limiter/limiter_test.go:96 +0x50
created by github.com/ulule/limiter.TestConcurrency
/Users/ddaniels/dev/src/github.com/ulule/limiter/limiter_test.go:100 +0x200
I think the solution to make this correct is to modify go-cache
to provide a GetAndSet(key string, setFn func)
that would allow us to synchronize and atomically check the cache for a missing key and provide a default value in an atomic way.
I have a PR up with go-cache
to add GetAndSet()
:
Currently multiple goroutines could be calling the limiter when the key expired and call:
s.Cache.Set(key, int64(1), rate.Period)
This is incorrect because multiple goroutines should be "incrementing" that value not initializing it, but there's no synchronization around that call.
I think the correct way to implement Get
in store_memory.go
is:
// Get implement Store.Get() method.
func (s *MemoryStore) Get(key string, rate Rate) (Context, error) {
ctx := Context{}
key = fmt.Sprintf("%s:%s", s.Prefix, key)
ms := int64(time.Millisecond)
var now time.Time
var expire time.Time
count, found := s.Cache.GetAndSet(
func(v interface{}, expiration time.Time, found bool) (interface{}, time.Duration) {
now = time.Now()
// Cache miss so default count to 1 with default expiration
if !found {
expire = now.Add(rate.Period)
// initial value 1 expires in rate.Period
return int64(1), rate.Period
}
expire = expiration
// increment count
return v.(int64) + 1, now.Sub(expiration)
})
remaining := int64(0)
if count < rate.Limit {
remaining = rate.Limit - count
}
expire := time.Unix(0, item.Expiration)
return Context{
Limit: rate.Limit,
Remaining: remaining,
Reset: expire.Add(time.Duration(expire.Sub(now).Seconds()) * time.Second).Unix(),
Reached: count > rate.Limit,
}, nil
}
Alternatively if they don't accept that PR to go-cache
we could pull the fork into here or add sync.RWMutext
to the store_memory.go
to protect mutually exclusive access to the cache (although there is a cache cleanup janitor go-routine running that we can't control and can result in concurrency issues, maybe we can disable that with a 0 cleanup duration and run cache cleanup in the store_memory.go
).
Hello,
Thank you for your feedback and sorry for the delayed response :)
A new version of limiter is planned and I'll make sure that these concurrency issues are fixed.
Do you have any metrics about how much goroutine you had to create this panic?
Thank you.