feat: add ability to clear cache

While the LRU package has the ability to purge all items from cache,
this functionality was not available to `ProtoGetter`, making it
imposibile to clear the cache without restarting all peers. This change
adds a `Clear()` method to `ProtoGetter`, that enables clearing the
cache with no downtime.
This commit is contained in:
ct16k 2022-12-17 17:43:36 +02:00
parent bde4250129
commit 95848327b2
7 changed files with 168 additions and 17 deletions

View File

@ -12,12 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [2.3.0] - 2022-01-06
### Added
* Added Group.Set() to allow users to explicity set values in the cache.
* Added Group.Set() to allow users to explicitly set values in the cache.
## [2.2.1] - 2021-01-13
### Changes
* Now uses the much faster fnv1
* Now md5 hashs the keys to help distribute hosts more evenly in some
* Now md5 hashes the keys to help distribute hosts more evenly in some
cases.
## [2.2.0] - 2019-07-09

View File

@ -156,4 +156,7 @@ func ExampleUsage() {
```
### Note
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()` creates a new pool internally within the `groupcache` package where it is uitilized by any groups created. The `pool` returned is only a pointer to the internallly registered pool so the caller can update the peers in the pool as needed.
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()`
creates a new pool internally within the `groupcache` package where it is utilized
by any groups created. The `pool` returned is only a pointer to the internally
registered pool so the caller can update the peers in the pool as needed.

View File

@ -35,7 +35,6 @@ func ExampleUsage() {
// Create a new group cache with a max cache size of 3MB
group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
func(ctx context.Context, id string, dest groupcache.Sink) error {
// In a real scenario we might fetch the value from a database.
/*if user, err := fetchUserFromMongo(ctx, id); err != nil {
return err
@ -58,7 +57,7 @@ func ExampleUsage() {
var user User
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {

View File

@ -298,7 +298,6 @@ func (g *Group) Remove(ctx context.Context, key string) error {
g.peersOnce.Do(g.initPeers)
_, err := g.removeGroup.Do(key, func() (interface{}, error) {
// Remove from key owner first
owner, ok := g.peers.PickPeer(key)
if ok {
@ -341,6 +340,41 @@ func (g *Group) Remove(ctx context.Context, key string) error {
return err
}
// Clear purges our cache then forwards the clear request to all peers.
func (g *Group) Clear(ctx context.Context) error {
g.peersOnce.Do(g.initPeers)
_, err := g.removeGroup.Do("", func() (interface{}, error) {
// Clear our cache first
g.localClear()
wg := sync.WaitGroup{}
errs := make(chan error)
// Asynchronously clear all caches of peers
for _, peer := range g.peers.GetAll() {
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.clearFromPeer(ctx, peer)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()
// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
}
return nil, err
})
return err
}
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
@ -490,6 +524,13 @@ func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string
return peer.Remove(ctx, req)
}
func (g *Group) clearFromPeer(ctx context.Context, peer ProtoGetter) error {
req := &pb.GetRequest{
Group: &g.name,
}
return peer.Clear(ctx, req)
}
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
if g.cacheBytes <= 0 {
return
@ -531,6 +572,19 @@ func (g *Group) localRemove(key string) {
})
}
func (g *Group) localClear() {
// Clear our local cache
if g.cacheBytes <= 0 {
return
}
// Ensure no requests are in flight
g.loadGroup.Lock(func() {
g.hotCache.clear()
g.mainCache.clear()
})
}
func (g *Group) populateCache(key string, value ByteView, cache *cache) {
if g.cacheBytes <= 0 {
return
@ -651,6 +705,15 @@ func (c *cache) remove(key string) {
c.lru.Remove(key)
}
func (c *cache) clear() {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
c.lru.Clear()
}
func (c *cache) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -249,6 +249,51 @@ func TestCacheEviction(t *testing.T) {
}
}
func TestCachePurging(t *testing.T) {
once.Do(testSetup)
testKey1 := "TestCachePurging-key1"
getTestKey1 := func() {
var res string
for i := 0; i < 10; i++ {
if err := stringGroup.Get(dummyCtx, testKey1, StringSink(&res)); err != nil {
t.Fatal(err)
}
}
}
fills := countFills(getTestKey1)
if fills != 1 {
t.Fatalf("expected 1 cache fill; got %d", fills)
}
testKey2 := "TestCachePurging-key2"
getTestKey2 := func() {
var res string
for i := 0; i < 10; i++ {
if err := stringGroup.Get(dummyCtx, testKey2, StringSink(&res)); err != nil {
t.Fatal(err)
}
}
}
fills = countFills(getTestKey2)
if fills != 1 {
t.Fatalf("expected 1 cache fill; got %d", fills)
}
g := stringGroup.(*Group)
// Clear the cache
g.Clear(dummyCtx)
// Test that the keys are gone.
fills = countFills(getTestKey1)
if fills != 1 {
t.Fatalf("expected 1 cache fill after cache purging; got %d", fills)
}
fills = countFills(getTestKey2)
if fills != 1 {
t.Fatalf("expected 1 cache fill after cache purging; got %d", fills)
}
}
type fakePeer struct {
hits int
fail bool
@ -279,6 +324,14 @@ func (p *fakePeer) Remove(_ context.Context, in *pb.GetRequest) error {
return nil
}
func (p *fakePeer) Clear(_ context.Context, in *pb.GetRequest) error {
p.hits++
if p.fail {
return errors.New("simulated error from peer")
}
return nil
}
func (p *fakePeer) GetURL() string {
return "fakePeer"
}

50
http.go
View File

@ -164,12 +164,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
lenParts := len(parts)
if (lenParts != 2) || ((lenParts == 1) && (r.Method != http.MethodDelete)) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]
// Fetch the value for this group/key.
group := GetGroup(groupName)
@ -186,6 +187,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
group.Stats.ServerRequests.Add(1)
if (lenParts == 1) && (r.Method == http.MethodDelete) {
group.localRemove("")
return
}
key := parts[1]
// Delete the key and return 200
if r.Method == http.MethodDelete {
group.localRemove(key)
@ -268,12 +276,21 @@ type request interface {
}
func (h *httpGetter) makeRequest(ctx context.Context, m string, in request, b io.Reader, out *http.Response) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.PathEscape(in.GetGroup()),
url.PathEscape(in.GetKey()),
)
var u string
if key := in.GetKey(); key != "" {
u = fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.PathEscape(in.GetGroup()),
url.PathEscape(key),
)
} else {
u = fmt.Sprintf(
"%v%v",
h.baseURL,
url.PathEscape(in.GetGroup()),
)
}
req, err := http.NewRequestWithContext(ctx, m, u, b)
if err != nil {
return err
@ -353,3 +370,20 @@ func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error {
}
return nil
}
func (h *httpGetter) Clear(ctx context.Context, in *pb.GetRequest) error {
var res http.Response
if err := h.makeRequest(ctx, http.MethodDelete, in, nil, &res); err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("while reading body response: %v", res.Status)
}
return fmt.Errorf("server returned status %d: %s", res.StatusCode, body)
}
return nil
}

View File

@ -29,6 +29,7 @@ type ProtoGetter interface {
Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error
Remove(context context.Context, in *pb.GetRequest) error
Set(context context.Context, in *pb.SetRequest) error
Clear(context context.Context, in *pb.GetRequest) error
// GetURL returns the peer URL
GetURL() string
}
@ -50,9 +51,7 @@ type NoPeers struct{}
func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }
func (NoPeers) GetAll() []ProtoGetter { return []ProtoGetter{} }
var (
portPicker func(groupName string) PeerPicker
)
var portPicker func(groupName string) PeerPicker
// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.