From d995f8a6686adff0b1f2f20c52ef4114981d2006 Mon Sep 17 00:00:00 2001 From: Tipp Moseley Date: Wed, 27 Jan 2016 11:33:35 -0800 Subject: [PATCH] Fixes a concurrency error in groupcache. Previously, multiple Get requests could simultaneously result in a load(). Only requests that enter singleflight Do concurrently would be deduped, so it was possible for populateCache to be called multiple times for the same key. That would result in overcounting nbytes, and eventually leading to a livelock where nbytes > cacheBytes, but there were no entries in the cache. Change-Id: I5b304ce82041c1310c31b662486744e86509cc53 --- groupcache.go | 36 ++++++++++++++++++++- groupcache_test.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/groupcache.go b/groupcache.go index 637b063..40410a0 100644 --- a/groupcache.go +++ b/groupcache.go @@ -100,6 +100,7 @@ func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *G getter: getter, peers: peers, cacheBytes: cacheBytes, + loadGroup: &singleflight.Group{}, } if fn := newGroupHook; fn != nil { fn(g) @@ -163,12 +164,20 @@ type Group struct { // loadGroup ensures that each key is only fetched once // (either locally or remotely), regardless of the number of // concurrent callers. - loadGroup singleflight.Group + loadGroup flightGroup // Stats are statistics on the group. Stats Stats } +// flightGroup is defined as an interface which flightgroup.Group +// satisfies. We define this so that we may test with an alternate +// implementation. +type flightGroup interface { + // Done is called when Do is done. + Do(key string, fn func() (interface{}, error)) (interface{}, error) +} + // Stats are per-group statistics. type Stats struct { Gets AtomicInt // any Get request, including from peers @@ -225,6 +234,31 @@ func (g *Group) Get(ctx Context, key string, dest Sink) error { func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { + // Check the cache again because singleflight can only dedup calls + // that overlap concurrently. It's possible for 2 concurrent + // requests to miss the cache, resulting in 2 load() calls. An + // unfortunate goroutine scheduling would result in this callback + // being run twice, serially. If we don't check the cache again, + // cache.nbytes would be incremented below even though there will + // be only one entry for this key. + // + // Consider the following serialized event ordering for two + // goroutines in which this callback gets called twice for hte + // same key: + // 1: Get("key") + // 2: Get("key") + // 1: lookupCache("key") + // 2: lookupCache("key") + // 1: load("key") + // 2: load("key") + // 1: loadGroup.Do("key", fn) + // 1: fn() + // 2: loadGroup.Do("key", fn) + // 2: fn() + if value, cacheHit := g.lookupCache(key); cacheHit { + g.Stats.CacheHits.Add(1) + return value, nil + } g.Stats.LoadsDeduped.Add(1) var value ByteView var err error diff --git a/groupcache_test.go b/groupcache_test.go index 0ecd6bd..3a4ecc2 100644 --- a/groupcache_test.go +++ b/groupcache_test.go @@ -363,5 +363,85 @@ func TestAllocatingByteSliceTarget(t *testing.T) { } } +// orderedFlightGroup allows the caller to force the schedule of when +// orig.Do will be called. This is useful to serialize calls such +// that singleflight cannot dedup them. +type orderedFlightGroup struct { + mu sync.Mutex + stage1 chan bool + stage2 chan bool + orig flightGroup +} + +func (g *orderedFlightGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) { + <-g.stage1 + <-g.stage2 + g.mu.Lock() + defer g.mu.Unlock() + return g.orig.Do(key, fn) +} + +// TestNoDedup tests invariants on the cache size when singleflight is +// unable to dedup calls. +func TestNoDedup(t *testing.T) { + const testkey = "testkey" + const testval = "testval" + g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error { + return dest.SetString(testval) + }), nil) + + orderedGroup := &orderedFlightGroup{ + stage1: make(chan bool), + stage2: make(chan bool), + orig: g.loadGroup, + } + // Replace loadGroup with our wrapper so we can control when + // loadGroup.Do is entered for each concurrent request. + g.loadGroup = orderedGroup + + // Issue two idential requests concurrently. Since the cache is + // empty, it will miss. Both will enter load(), but we will only + // allow one at a time to enter singleflight.Do, so the callback + // function will be called twice. + resc := make(chan string, 2) + for i := 0; i < 2; i++ { + go func() { + var s string + if err := g.Get(dummyCtx, testkey, StringSink(&s)); err != nil { + resc <- "ERROR:" + err.Error() + return + } + resc <- s + }() + } + + // Ensure both goroutines have entered the Do routine. This implies + // both concurrent requests have checked the cache, found it empty, + // and called load(). + orderedGroup.stage1 <- true + orderedGroup.stage1 <- true + orderedGroup.stage2 <- true + orderedGroup.stage2 <- true + + for i := 0; i < 2; i++ { + if s := <-resc; s != testval { + t.Errorf("result is %s want %s", s, testval) + } + } + + const wantItems = 1 + if g.mainCache.items() != wantItems { + t.Errorf("mainCache has %d items, want %d", g.mainCache.items(), wantItems) + } + + // If the singleflight callback doesn't double-check the cache again + // upon entry, we would increment nbytes twice but the entry would + // only be in the cache once. + const wantBytes = int64(len(testkey) + len(testval)) + if g.mainCache.nbytes != wantBytes { + t.Errorf("cache has %d bytes, want %d", g.mainCache.nbytes, wantBytes) + } +} + // TODO(bradfitz): port the Google-internal full integration test into here, // using HTTP requests instead of our RPC system.