From 7b47233b4b0b03e9437f963a1fb002a01558823e Mon Sep 17 00:00:00 2001 From: censhin Date: Thu, 29 Apr 2021 17:21:31 -0400 Subject: [PATCH] feat(all): adding setter to cache a specific value --- groupcache.go | 80 +++++++++++++++++++++++++++++++++++ groupcachepb/groupcache.pb.go | 1 + peers.go | 1 + 3 files changed, 82 insertions(+) diff --git a/groupcache.go b/groupcache.go index 5eda3cf..143f6a0 100644 --- a/groupcache.go +++ b/groupcache.go @@ -182,6 +182,10 @@ type Group struct { // concurrent callers. loadGroup flightGroup + // setGroup ensures that each added key is only added + // remotely once regardless of the number of concurrent callers. + setGroup flightGroup + // removeGroup ensures that each removed key is only removed // remotely once regardless of the number of concurrent callers. removeGroup flightGroup @@ -253,6 +257,53 @@ func (g *Group) Get(ctx context.Context, key string, dest Sink) error { return setSinkView(dest, value) } +func (g *Group) Set(ctx context.Context, key string, value interface{}) error { + g.peersOnce.Do(g.initPeers) + + _, err := g.setGroup.Do(key, func() (interface{}, error) { + + // Set to key owner first + owner, ok := g.peers.PickPeer(key) + if ok { + if err := g.setFromPeer(ctx, owner, key, value); err != nil { + return nil, err + } + } + // Set to our cache next + g.localSet(key, value) + wg := sync.WaitGroup{} + errs := make(chan error) + + // Asynchronously add the key and value to all hot and main caches of peers + for _, peer := range g.peers.GetAll() { + // avoid adding to owner a second time + if peer == owner { + continue + } + + wg.Add(1) + go func(peer ProtoGetter) { + errs <- g.setFromPeer(ctx, peer, key, value) + wg.Done() + }(peer) + } + go func() { + wg.Wait() + close(errs) + }() + + // TODO(thrawn01): Should we report all errors? Reporting context + // cancelled error for each peer doesn't make much sense. + var err error + for e := range errs { + err = e + } + + return nil, err + }) + return err +} + // Remove clears the key from our cache then forwards the remove // request to all peers. func (g *Group) Remove(ctx context.Context, key string) error { @@ -425,6 +476,15 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) ( return value, nil } +func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, key string, value interface{}) error { + req := &pb.GetRequest{ + Group: &g.name, + Key: &key, + Value: value, + } + return peer.Set(ctx, req) +} + func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error { req := &pb.GetRequest{ Group: &g.name, @@ -445,6 +505,17 @@ func (g *Group) lookupCache(key string) (value ByteView, ok bool) { return } +func (g *Group) localSet(key string, value interface{}) { + if g.cacheBytes <= 0 { + return + } + + g.loadGroup.Lock(func() { + g.hotCache.set(key, value) + g.mainCache.set(key, value) + }) +} + func (g *Group) localRemove(key string) { // Clear key from our local cache if g.cacheBytes <= 0 { @@ -563,6 +634,15 @@ func (c *cache) get(key string) (value ByteView, ok bool) { return vi.(ByteView), true } +func (c *cache) set(key string, value interface{}) { + c.mu.Lock() + defer c.mu.Unlock() + if c.lru == nil { + return + } + c.lru.Add(key, value, ) // TODO: add duration +} + func (c *cache) remove(key string) { c.mu.Lock() defer c.mu.Unlock() diff --git a/groupcachepb/groupcache.pb.go b/groupcachepb/groupcache.pb.go index 85b7c69..f6957e0 100644 --- a/groupcachepb/groupcache.pb.go +++ b/groupcachepb/groupcache.pb.go @@ -31,6 +31,7 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type GetRequest struct { Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"` Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"` + Value interface{} XXX_unrecognized []byte `json:"-"` } diff --git a/peers.go b/peers.go index 18d8659..299d060 100644 --- a/peers.go +++ b/peers.go @@ -28,6 +28,7 @@ import ( type ProtoGetter interface { Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error Remove(context context.Context, in *pb.GetRequest) error + Set(context context.Context, in *pb.GetRequest) error // GetURL returns the peer URL GetURL() string }