diff --git a/CHANGELOG b/CHANGELOG index a9234a0..b631f1e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -12,12 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [2.3.0] - 2022-01-06 ### Added -* Added Group.Set() to allow users to explicity set values in the cache. +* Added Group.Set() to allow users to explicitly set values in the cache. ## [2.2.1] - 2021-01-13 ### Changes * Now uses the much faster fnv1 -* Now md5 hashs the keys to help distribute hosts more evenly in some +* Now md5 hashes the keys to help distribute hosts more evenly in some cases. ## [2.2.0] - 2019-07-09 diff --git a/README.md b/README.md index 2e82a16..b2ed5b2 100644 --- a/README.md +++ b/README.md @@ -156,4 +156,7 @@ func ExampleUsage() { ``` ### Note -The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()` creates a new pool internally within the `groupcache` package where it is uitilized by any groups created. The `pool` returned is only a pointer to the internallly registered pool so the caller can update the peers in the pool as needed. +The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()` +creates a new pool internally within the `groupcache` package where it is utilized +by any groups created. The `pool` returned is only a pointer to the internally +registered pool so the caller can update the peers in the pool as needed. diff --git a/example_test.go b/example_test.go index 9fd306e..3771177 100644 --- a/example_test.go +++ b/example_test.go @@ -35,7 +35,6 @@ func ExampleUsage() { // Create a new group cache with a max cache size of 3MB group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc( func(ctx context.Context, id string, dest groupcache.Sink) error { - // In a real scenario we might fetch the value from a database. /*if user, err := fetchUserFromMongo(ctx, id); err != nil { return err @@ -58,7 +57,7 @@ func ExampleUsage() { var user User - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil { diff --git a/groupcache.go b/groupcache.go index 3d1ebec..da0fe59 100644 --- a/groupcache.go +++ b/groupcache.go @@ -298,7 +298,6 @@ func (g *Group) Remove(ctx context.Context, key string) error { g.peersOnce.Do(g.initPeers) _, err := g.removeGroup.Do(key, func() (interface{}, error) { - // Remove from key owner first owner, ok := g.peers.PickPeer(key) if ok { @@ -341,6 +340,41 @@ func (g *Group) Remove(ctx context.Context, key string) error { return err } +// Clear purges our cache then forwards the clear request to all peers. +func (g *Group) Clear(ctx context.Context) error { + g.peersOnce.Do(g.initPeers) + + _, err := g.removeGroup.Do("", func() (interface{}, error) { + // Clear our cache first + g.localClear() + wg := sync.WaitGroup{} + errs := make(chan error) + + // Asynchronously clear all caches of peers + for _, peer := range g.peers.GetAll() { + wg.Add(1) + go func(peer ProtoGetter) { + errs <- g.clearFromPeer(ctx, peer) + wg.Done() + }(peer) + } + go func() { + wg.Wait() + close(errs) + }() + + // TODO(thrawn01): Should we report all errors? Reporting context + // cancelled error for each peer doesn't make much sense. + var err error + for e := range errs { + err = e + } + + return nil, err + }) + return err +} + // load loads key either by invoking the getter locally or by sending it to another machine. func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) @@ -490,6 +524,13 @@ func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string return peer.Remove(ctx, req) } +func (g *Group) clearFromPeer(ctx context.Context, peer ProtoGetter) error { + req := &pb.GetRequest{ + Group: &g.name, + } + return peer.Clear(ctx, req) +} + func (g *Group) lookupCache(key string) (value ByteView, ok bool) { if g.cacheBytes <= 0 { return @@ -531,6 +572,19 @@ func (g *Group) localRemove(key string) { }) } +func (g *Group) localClear() { + // Clear our local cache + if g.cacheBytes <= 0 { + return + } + + // Ensure no requests are in flight + g.loadGroup.Lock(func() { + g.hotCache.clear() + g.mainCache.clear() + }) +} + func (g *Group) populateCache(key string, value ByteView, cache *cache) { if g.cacheBytes <= 0 { return @@ -651,6 +705,15 @@ func (c *cache) remove(key string) { c.lru.Remove(key) } +func (c *cache) clear() { + c.mu.Lock() + defer c.mu.Unlock() + if c.lru == nil { + return + } + c.lru.Clear() +} + func (c *cache) removeOldest() { c.mu.Lock() defer c.mu.Unlock() diff --git a/groupcache_test.go b/groupcache_test.go index 4a667ed..658c916 100644 --- a/groupcache_test.go +++ b/groupcache_test.go @@ -249,6 +249,51 @@ func TestCacheEviction(t *testing.T) { } } +func TestCachePurging(t *testing.T) { + once.Do(testSetup) + testKey1 := "TestCachePurging-key1" + getTestKey1 := func() { + var res string + for i := 0; i < 10; i++ { + if err := stringGroup.Get(dummyCtx, testKey1, StringSink(&res)); err != nil { + t.Fatal(err) + } + } + } + fills := countFills(getTestKey1) + if fills != 1 { + t.Fatalf("expected 1 cache fill; got %d", fills) + } + + testKey2 := "TestCachePurging-key2" + getTestKey2 := func() { + var res string + for i := 0; i < 10; i++ { + if err := stringGroup.Get(dummyCtx, testKey2, StringSink(&res)); err != nil { + t.Fatal(err) + } + } + } + fills = countFills(getTestKey2) + if fills != 1 { + t.Fatalf("expected 1 cache fill; got %d", fills) + } + + g := stringGroup.(*Group) + // Clear the cache + g.Clear(dummyCtx) + + // Test that the keys are gone. + fills = countFills(getTestKey1) + if fills != 1 { + t.Fatalf("expected 1 cache fill after cache purging; got %d", fills) + } + fills = countFills(getTestKey2) + if fills != 1 { + t.Fatalf("expected 1 cache fill after cache purging; got %d", fills) + } +} + type fakePeer struct { hits int fail bool @@ -279,6 +324,14 @@ func (p *fakePeer) Remove(_ context.Context, in *pb.GetRequest) error { return nil } +func (p *fakePeer) Clear(_ context.Context, in *pb.GetRequest) error { + p.hits++ + if p.fail { + return errors.New("simulated error from peer") + } + return nil +} + func (p *fakePeer) GetURL() string { return "fakePeer" } diff --git a/http.go b/http.go index 9a5a6d2..3f421ca 100644 --- a/http.go +++ b/http.go @@ -164,12 +164,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { panic("HTTPPool serving unexpected path: " + r.URL.Path) } parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2) - if len(parts) != 2 { + lenParts := len(parts) + + if (lenParts != 2) || ((lenParts == 1) && (r.Method != http.MethodDelete)) { http.Error(w, "bad request", http.StatusBadRequest) return } groupName := parts[0] - key := parts[1] // Fetch the value for this group/key. group := GetGroup(groupName) @@ -186,6 +187,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { group.Stats.ServerRequests.Add(1) + if (lenParts == 1) && (r.Method == http.MethodDelete) { + group.localRemove("") + return + } + + key := parts[1] + // Delete the key and return 200 if r.Method == http.MethodDelete { group.localRemove(key) @@ -268,12 +276,21 @@ type request interface { } func (h *httpGetter) makeRequest(ctx context.Context, m string, in request, b io.Reader, out *http.Response) error { - u := fmt.Sprintf( - "%v%v/%v", - h.baseURL, - url.PathEscape(in.GetGroup()), - url.PathEscape(in.GetKey()), - ) + var u string + if key := in.GetKey(); key != "" { + u = fmt.Sprintf( + "%v%v/%v", + h.baseURL, + url.PathEscape(in.GetGroup()), + url.PathEscape(key), + ) + } else { + u = fmt.Sprintf( + "%v%v", + h.baseURL, + url.PathEscape(in.GetGroup()), + ) + } req, err := http.NewRequestWithContext(ctx, m, u, b) if err != nil { return err @@ -353,3 +370,20 @@ func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error { } return nil } + +func (h *httpGetter) Clear(ctx context.Context, in *pb.GetRequest) error { + var res http.Response + if err := h.makeRequest(ctx, http.MethodDelete, in, nil, &res); err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("while reading body response: %v", res.Status) + } + return fmt.Errorf("server returned status %d: %s", res.StatusCode, body) + } + return nil +} diff --git a/peers.go b/peers.go index 39fd76a..3aa60bc 100644 --- a/peers.go +++ b/peers.go @@ -29,6 +29,7 @@ type ProtoGetter interface { Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error Remove(context context.Context, in *pb.GetRequest) error Set(context context.Context, in *pb.SetRequest) error + Clear(context context.Context, in *pb.GetRequest) error // GetURL returns the peer URL GetURL() string } @@ -50,9 +51,7 @@ type NoPeers struct{} func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return } func (NoPeers) GetAll() []ProtoGetter { return []ProtoGetter{} } -var ( - portPicker func(groupName string) PeerPicker -) +var portPicker func(groupName string) PeerPicker // RegisterPeerPicker registers the peer initialization function. // It is called once, when the first group is created.