feat(all): adding setter to cache a specific value

This commit is contained in:
censhin 2021-04-29 17:21:31 -04:00
parent 2fc526bccc
commit 7b47233b4b
3 changed files with 82 additions and 0 deletions

View File

@ -182,6 +182,10 @@ type Group struct {
// concurrent callers.
loadGroup flightGroup
// setGroup ensures that each added key is only added
// remotely once regardless of the number of concurrent callers.
setGroup flightGroup
// removeGroup ensures that each removed key is only removed
// remotely once regardless of the number of concurrent callers.
removeGroup flightGroup
@ -253,6 +257,53 @@ func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
return setSinkView(dest, value)
}
func (g *Group) Set(ctx context.Context, key string, value interface{}) error {
g.peersOnce.Do(g.initPeers)
_, err := g.setGroup.Do(key, func() (interface{}, error) {
// Set to key owner first
owner, ok := g.peers.PickPeer(key)
if ok {
if err := g.setFromPeer(ctx, owner, key, value); err != nil {
return nil, err
}
}
// Set to our cache next
g.localSet(key, value)
wg := sync.WaitGroup{}
errs := make(chan error)
// Asynchronously add the key and value to all hot and main caches of peers
for _, peer := range g.peers.GetAll() {
// avoid adding to owner a second time
if peer == owner {
continue
}
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.setFromPeer(ctx, peer, key, value)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()
// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
}
return nil, err
})
return err
}
// Remove clears the key from our cache then forwards the remove
// request to all peers.
func (g *Group) Remove(ctx context.Context, key string) error {
@ -425,6 +476,15 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (
return value, nil
}
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, key string, value interface{}) error {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
Value: value,
}
return peer.Set(ctx, req)
}
func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error {
req := &pb.GetRequest{
Group: &g.name,
@ -445,6 +505,17 @@ func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
return
}
func (g *Group) localSet(key string, value interface{}) {
if g.cacheBytes <= 0 {
return
}
g.loadGroup.Lock(func() {
g.hotCache.set(key, value)
g.mainCache.set(key, value)
})
}
func (g *Group) localRemove(key string) {
// Clear key from our local cache
if g.cacheBytes <= 0 {
@ -563,6 +634,15 @@ func (c *cache) get(key string) (value ByteView, ok bool) {
return vi.(ByteView), true
}
func (c *cache) set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
c.lru.Add(key, value, ) // TODO: add duration
}
func (c *cache) remove(key string) {
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -31,6 +31,7 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type GetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
Value interface{}
XXX_unrecognized []byte `json:"-"`
}

View File

@ -28,6 +28,7 @@ import (
type ProtoGetter interface {
Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error
Remove(context context.Context, in *pb.GetRequest) error
Set(context context.Context, in *pb.GetRequest) error
// GetURL returns the peer URL
GetURL() string
}