Compare commits

...

4 Commits

Author SHA1 Message Date
ct16k
545382fe17
Merge 7f8db4d8ed into cc3d361eb2 2023-06-13 09:23:40 -05:00
Derrick J. Wippler
cc3d361eb2
Improved http error tests (#61) 2023-06-13 09:16:37 -05:00
ct16k
7f8db4d8ed fix: add generation tracking to prevent keys surviving a clear operation
Running a Purge and a Get at the same time could allow an old key to be
pulled back into the cache if the request was initiated by a node that
had been cleared and sent to a node that hadn't. This patch tries to
mitigate this, by adding an additional `generation` field, that keeps
track of the number of purges issued. Requests are fulfilled successfully
only if boths ends of a request are on the same generation.
2023-01-22 10:58:45 +02:00
ct16k
95848327b2 feat: add ability to clear cache
While the LRU package has the ability to purge all items from cache,
this functionality was not available to `ProtoGetter`, making it
imposibile to clear the cache without restarting all peers. This change
adds a `Clear()` method to `ProtoGetter`, that enables clearing the
cache with no downtime.
2022-12-23 21:46:48 +02:00
13 changed files with 592 additions and 164 deletions

View File

@ -14,12 +14,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [2.3.0] - 2022-01-06
### Added
* Added Group.Set() to allow users to explicity set values in the cache.
* Added Group.Set() to allow users to explicitly set values in the cache.
## [2.2.1] - 2021-01-13
### Changes
* Now uses the much faster fnv1
* Now md5 hashs the keys to help distribute hosts more evenly in some
* Now md5 hashes the keys to help distribute hosts more evenly in some
cases.
## [2.2.0] - 2019-07-09

View File

@ -156,4 +156,7 @@ func ExampleUsage() {
```
### Note
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()` creates a new pool internally within the `groupcache` package where it is uitilized by any groups created. The `pool` returned is only a pointer to the internallly registered pool so the caller can update the peers in the pool as needed.
The call to `groupcache.NewHTTPPoolOpts()` is a bit misleading. `NewHTTPPoolOpts()`
creates a new pool internally within the `groupcache` package where it is utilized
by any groups created. The `pool` returned is only a pointer to the internally
registered pool so the caller can update the peers in the pool as needed.

View File

@ -31,17 +31,24 @@ import (
// A ByteView is meant to be used as a value type, not
// a pointer (like a time.Time).
type ByteView struct {
e time.Time
s string
// If b is non-nil, b is used, else s is used.
b []byte
s string
e time.Time
// generation is an optional field, used only in certain operations
g int64
}
// Returns the expire time associated with this view
// Expire returns the expire time associated with this view
func (v ByteView) Expire() time.Time {
return v.e
}
// Generation returns the generation associated with this cache view
func (v ByteView) Generation() int64 {
return v.g
}
// Len returns the view's length.
func (v ByteView) Len() int {
if v.b != nil {

View File

@ -17,8 +17,9 @@ func (e *ErrNotFound) Is(target error) bool {
return ok
}
// ErrRemoteCall is returned from `group.Get()` when an error that is not a `ErrNotFound`
// is returned during a remote HTTP instance call
// ErrRemoteCall is returned from `group.Get()` when a remote GetterFunc returns an
// error. When this happens `group.Get()` does not attempt to retrieve the value
// via our local GetterFunc.
type ErrRemoteCall struct {
Msg string
}

View File

@ -35,7 +35,6 @@ func ExampleUsage() {
// Create a new group cache with a max cache size of 3MB
group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
func(ctx context.Context, id string, dest groupcache.Sink) error {
// In a real scenario we might fetch the value from a database.
/*if user, err := fetchUserFromMongo(ctx, id); err != nil {
return err
@ -58,7 +57,7 @@ func ExampleUsage() {
var user User
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {

View File

@ -274,19 +274,20 @@ func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.T
_, err := g.setGroup.Do(key, func() (interface{}, error) {
// If remote peer owns this key
owner, ok := g.peers.PickPeer(key)
generation := g.mainCache.generation()
if ok {
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
if err := g.setFromPeer(ctx, owner, key, value, expire, generation); err != nil {
return nil, err
}
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localSet(key, value, expire, &g.hotCache)
g.localSet(key, value, expire, generation, &g.hotCache)
}
return nil, nil
}
// We own this key
g.localSet(key, value, expire, &g.mainCache)
g.localSet(key, value, expire, generation, &g.mainCache)
return nil, nil
})
return err
@ -298,7 +299,6 @@ func (g *Group) Remove(ctx context.Context, key string) error {
g.peersOnce.Do(g.initPeers)
_, err := g.removeGroup.Do(key, func() (interface{}, error) {
// Remove from key owner first
owner, ok := g.peers.PickPeer(key)
if ok {
@ -341,6 +341,41 @@ func (g *Group) Remove(ctx context.Context, key string) error {
return err
}
// Clear purges our cache then forwards the clear request to all peers.
func (g *Group) Clear(ctx context.Context) error {
g.peersOnce.Do(g.initPeers)
_, err := g.removeGroup.Do("", func() (interface{}, error) {
// Clear our cache first
g.localClear()
wg := sync.WaitGroup{}
errs := make(chan error)
// Asynchronously clear all caches of peers
for _, peer := range g.peers.GetAll() {
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.clearFromPeer(ctx, peer)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()
// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
}
return nil, err
})
return err
}
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
@ -466,23 +501,29 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (
}
}
value := ByteView{b: res.Value, e: expire}
var generation int64
if res.Generation != nil {
generation = *res.Generation
}
value := ByteView{b: res.Value, e: expire, g: generation}
// Always populate the hot cache
g.populateCache(key, value, &g.hotCache)
return value, nil
}
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time) error {
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time, gen int64) error {
var expire int64
if !e.IsZero() {
expire = e.UnixNano()
}
req := &pb.SetRequest{
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
Expire: &expire,
Group: &g.name,
Key: &k,
Value: v,
Generation: &gen,
}
return peer.Set(ctx, req)
}
@ -495,6 +536,13 @@ func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string
return peer.Remove(ctx, req)
}
func (g *Group) clearFromPeer(ctx context.Context, peer ProtoGetter) error {
req := &pb.GetRequest{
Group: &g.name,
}
return peer.Clear(ctx, req)
}
func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
if g.cacheBytes <= 0 {
return
@ -507,7 +555,7 @@ func (g *Group) lookupCache(key string) (value ByteView, ok bool) {
return
}
func (g *Group) localSet(key string, value []byte, expire time.Time, cache *cache) {
func (g *Group) localSet(key string, value []byte, expire time.Time, generation int64, cache *cache) {
if g.cacheBytes <= 0 {
return
}
@ -515,6 +563,7 @@ func (g *Group) localSet(key string, value []byte, expire time.Time, cache *cach
bv := ByteView{
b: value,
e: expire,
g: generation,
}
// Ensure no requests are in flight
@ -536,6 +585,19 @@ func (g *Group) localRemove(key string) {
})
}
func (g *Group) localClear() {
// Clear our local cache
if g.cacheBytes <= 0 {
return
}
// Ensure no requests are in flight
g.loadGroup.Lock(func() {
g.hotCache.clear()
g.mainCache.clear()
})
}
func (g *Group) populateCache(key string, value ByteView, cache *cache) {
if g.cacheBytes <= 0 {
return
@ -597,21 +659,23 @@ var NowFunc lru.NowFunc = time.Now
// values.
type cache struct {
mu sync.RWMutex
nbytes int64 // of all keys and values
lru *lru.Cache
nbytes int64 // of all keys and values
nhit, nget int64
nevict int64 // number of evictions
gen int64
}
func (c *cache) stats() CacheStats {
c.mu.RLock()
defer c.mu.RUnlock()
return CacheStats{
Bytes: c.nbytes,
Items: c.itemsLocked(),
Gets: c.nget,
Hits: c.nhit,
Evictions: c.nevict,
Bytes: c.nbytes,
Items: c.itemsLocked(),
Gets: c.nget,
Hits: c.nhit,
Evictions: c.nevict,
Generation: c.gen,
}
}
@ -628,6 +692,16 @@ func (c *cache) add(key string, value ByteView) {
},
}
}
if c.gen != value.g {
if logger != nil {
logger.Error().WithFields(map[string]interface{}{
"got": value.g,
"have": c.generation,
"key": key,
}).Printf("generation mismatch")
}
return
}
c.lru.Add(key, value, value.Expire())
c.nbytes += int64(len(key)) + int64(value.Len())
}
@ -644,7 +718,10 @@ func (c *cache) get(key string) (value ByteView, ok bool) {
return
}
c.nhit++
return vi.(ByteView), true
bv := vi.(ByteView)
bv.g = c.gen
return bv, true
}
func (c *cache) remove(key string) {
@ -656,6 +733,15 @@ func (c *cache) remove(key string) {
c.lru.Remove(key)
}
func (c *cache) clear() {
c.mu.Lock()
defer c.mu.Unlock()
if c.lru == nil {
return
}
c.lru.Clear()
}
func (c *cache) removeOldest() {
c.mu.Lock()
defer c.mu.Unlock()
@ -683,6 +769,12 @@ func (c *cache) itemsLocked() int64 {
return int64(c.lru.Len())
}
func (c *cache) generation() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.gen
}
// An AtomicInt is an int64 to be accessed atomically.
type AtomicInt int64
@ -707,9 +799,10 @@ func (i *AtomicInt) String() string {
// CacheStats are returned by stats accessors on Group.
type CacheStats struct {
Bytes int64
Items int64
Gets int64
Hits int64
Evictions int64
Bytes int64
Items int64
Gets int64
Hits int64
Evictions int64
Generation int64
}

View File

@ -249,6 +249,51 @@ func TestCacheEviction(t *testing.T) {
}
}
func TestCachePurging(t *testing.T) {
once.Do(testSetup)
testKey1 := "TestCachePurging-key1"
getTestKey1 := func() {
var res string
for i := 0; i < 10; i++ {
if err := stringGroup.Get(dummyCtx, testKey1, StringSink(&res)); err != nil {
t.Fatal(err)
}
}
}
fills := countFills(getTestKey1)
if fills != 1 {
t.Fatalf("expected 1 cache fill; got %d", fills)
}
testKey2 := "TestCachePurging-key2"
getTestKey2 := func() {
var res string
for i := 0; i < 10; i++ {
if err := stringGroup.Get(dummyCtx, testKey2, StringSink(&res)); err != nil {
t.Fatal(err)
}
}
}
fills = countFills(getTestKey2)
if fills != 1 {
t.Fatalf("expected 1 cache fill; got %d", fills)
}
g := stringGroup.(*Group)
// Clear the cache
g.Clear(dummyCtx)
// Test that the keys are gone.
fills = countFills(getTestKey1)
if fills != 1 {
t.Fatalf("expected 1 cache fill after cache purging; got %d", fills)
}
fills = countFills(getTestKey2)
if fills != 1 {
t.Fatalf("expected 1 cache fill after cache purging; got %d", fills)
}
}
type fakePeer struct {
hits int
fail bool
@ -279,6 +324,14 @@ func (p *fakePeer) Remove(_ context.Context, in *pb.GetRequest) error {
return nil
}
func (p *fakePeer) Clear(_ context.Context, in *pb.GetRequest) error {
p.hits++
if p.fail {
return errors.New("simulated error from peer")
}
return nil
}
func (p *fakePeer) GetURL() string {
return "fakePeer"
}

View File

@ -1,155 +1,377 @@
//
//Copyright 2012 Google Inc.
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.21.12
// source: groupcache.proto
/*
Package groupcachepb is a generated protocol buffer package.
It is generated from these files:
groupcache.proto
It has these top-level messages:
GetRequest
GetResponse
SetRequest
*/
package groupcachepb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type GetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
XXX_unrecognized []byte `json:"-"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"` // not actually required/guaranteed to be UTF-8
Generation *int64 `protobuf:"varint,3,req,name=generation" json:"generation,omitempty"`
}
func (m *GetRequest) Reset() { *m = GetRequest{} }
func (m *GetRequest) String() string { return proto.CompactTextString(m) }
func (*GetRequest) ProtoMessage() {}
func (*GetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (x *GetRequest) Reset() {
*x = GetRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_groupcache_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (m *GetRequest) GetGroup() string {
if m != nil && m.Group != nil {
return *m.Group
func (x *GetRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetRequest) ProtoMessage() {}
func (x *GetRequest) ProtoReflect() protoreflect.Message {
mi := &file_groupcache_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead.
func (*GetRequest) Descriptor() ([]byte, []int) {
return file_groupcache_proto_rawDescGZIP(), []int{0}
}
func (x *GetRequest) GetGroup() string {
if x != nil && x.Group != nil {
return *x.Group
}
return ""
}
func (m *GetRequest) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
func (x *GetRequest) GetKey() string {
if x != nil && x.Key != nil {
return *x.Key
}
return ""
}
type GetResponse struct {
Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps" json:"minute_qps,omitempty"`
Expire *int64 `protobuf:"varint,3,opt,name=expire" json:"expire,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *GetResponse) Reset() { *m = GetResponse{} }
func (m *GetResponse) String() string { return proto.CompactTextString(m) }
func (*GetResponse) ProtoMessage() {}
func (*GetResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *GetResponse) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *GetResponse) GetMinuteQps() float64 {
if m != nil && m.MinuteQps != nil {
return *m.MinuteQps
func (x *GetRequest) GetGeneration() int64 {
if x != nil && x.Generation != nil {
return *x.Generation
}
return 0
}
func (m *GetResponse) GetExpire() int64 {
if m != nil && m.Expire != nil {
return *m.Expire
type GetResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps" json:"minute_qps,omitempty"`
Expire *int64 `protobuf:"varint,3,opt,name=expire" json:"expire,omitempty"`
Generation *int64 `protobuf:"varint,4,opt,name=generation" json:"generation,omitempty"`
}
func (x *GetResponse) Reset() {
*x = GetResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_groupcache_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *GetResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetResponse) ProtoMessage() {}
func (x *GetResponse) ProtoReflect() protoreflect.Message {
mi := &file_groupcache_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead.
func (*GetResponse) Descriptor() ([]byte, []int) {
return file_groupcache_proto_rawDescGZIP(), []int{1}
}
func (x *GetResponse) GetValue() []byte {
if x != nil {
return x.Value
}
return nil
}
func (x *GetResponse) GetMinuteQps() float64 {
if x != nil && x.MinuteQps != nil {
return *x.MinuteQps
}
return 0
}
func (x *GetResponse) GetExpire() int64 {
if x != nil && x.Expire != nil {
return *x.Expire
}
return 0
}
func (x *GetResponse) GetGeneration() int64 {
if x != nil && x.Generation != nil {
return *x.Generation
}
return 0
}
type SetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Expire *int64 `protobuf:"varint,4,opt,name=expire" json:"expire,omitempty"`
XXX_unrecognized []byte `json:"-"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Expire *int64 `protobuf:"varint,4,opt,name=expire" json:"expire,omitempty"`
Generation *int64 `protobuf:"varint,5,opt,name=generation" json:"generation,omitempty"`
}
func (m *SetRequest) Reset() { *m = SetRequest{} }
func (m *SetRequest) String() string { return proto.CompactTextString(m) }
func (*SetRequest) ProtoMessage() {}
func (*SetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (x *SetRequest) Reset() {
*x = SetRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_groupcache_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (m *SetRequest) GetGroup() string {
if m != nil && m.Group != nil {
return *m.Group
func (x *SetRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SetRequest) ProtoMessage() {}
func (x *SetRequest) ProtoReflect() protoreflect.Message {
mi := &file_groupcache_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SetRequest.ProtoReflect.Descriptor instead.
func (*SetRequest) Descriptor() ([]byte, []int) {
return file_groupcache_proto_rawDescGZIP(), []int{2}
}
func (x *SetRequest) GetGroup() string {
if x != nil && x.Group != nil {
return *x.Group
}
return ""
}
func (m *SetRequest) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
func (x *SetRequest) GetKey() string {
if x != nil && x.Key != nil {
return *x.Key
}
return ""
}
func (m *SetRequest) GetValue() []byte {
if m != nil {
return m.Value
func (x *SetRequest) GetValue() []byte {
if x != nil {
return x.Value
}
return nil
}
func (m *SetRequest) GetExpire() int64 {
if m != nil && m.Expire != nil {
return *m.Expire
func (x *SetRequest) GetExpire() int64 {
if x != nil && x.Expire != nil {
return *x.Expire
}
return 0
}
func init() {
proto.RegisterType((*GetRequest)(nil), "groupcachepb.GetRequest")
proto.RegisterType((*GetResponse)(nil), "groupcachepb.GetResponse")
proto.RegisterType((*SetRequest)(nil), "groupcachepb.SetRequest")
func (x *SetRequest) GetGeneration() int64 {
if x != nil && x.Generation != nil {
return *x.Generation
}
return 0
}
func init() { proto.RegisterFile("groupcache.proto", fileDescriptor0) }
var File_groupcache_proto protoreflect.FileDescriptor
var fileDescriptor0 = []byte{
// 215 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0x31, 0x4b, 0xc5, 0x30,
0x18, 0x34, 0x8d, 0x0a, 0xfd, 0xec, 0x50, 0x82, 0x48, 0x14, 0x84, 0x90, 0x29, 0x53, 0x07, 0x71,
0x74, 0x73, 0x28, 0xb8, 0x19, 0x37, 0x17, 0x69, 0xcb, 0x87, 0x16, 0xb5, 0x49, 0x9b, 0x44, 0x7c,
0xff, 0xfe, 0x91, 0xe6, 0x41, 0x3a, 0xbc, 0xe5, 0x6d, 0xb9, 0x3b, 0x2e, 0x77, 0xdf, 0x41, 0xfd,
0xb9, 0x98, 0x60, 0x87, 0x6e, 0xf8, 0xc2, 0xc6, 0x2e, 0xc6, 0x1b, 0x56, 0x65, 0xc6, 0xf6, 0xf2,
0x11, 0xa0, 0x45, 0xaf, 0x71, 0x0e, 0xe8, 0x3c, 0xbb, 0x86, 0x8b, 0x55, 0xe5, 0x44, 0x14, 0xaa,
0xd4, 0x09, 0xb0, 0x1a, 0xe8, 0x37, 0xee, 0x78, 0xb1, 0x72, 0xf1, 0x29, 0xdf, 0xe1, 0x6a, 0x75,
0x39, 0x6b, 0x26, 0x87, 0xd1, 0xf6, 0xd7, 0xfd, 0x04, 0xe4, 0x44, 0x10, 0x55, 0xe9, 0x04, 0xd8,
0x3d, 0xc0, 0xef, 0x38, 0x05, 0x8f, 0x1f, 0xb3, 0x75, 0xbc, 0x10, 0x44, 0x11, 0x5d, 0x26, 0xe6,
0xd5, 0x3a, 0x76, 0x03, 0x97, 0xf8, 0x6f, 0xc7, 0x05, 0x39, 0x15, 0x44, 0x51, 0x7d, 0x40, 0xb2,
0x07, 0x78, 0x3b, 0xb9, 0x51, 0xae, 0x40, 0xb7, 0x15, 0x72, 0xc6, 0xf9, 0x36, 0xe3, 0xe1, 0x05,
0xa0, 0x8d, 0x1f, 0x3d, 0xc7, 0x15, 0xd8, 0x13, 0xd0, 0x16, 0x3d, 0xe3, 0xcd, 0x76, 0x99, 0x26,
0xcf, 0x72, 0x77, 0x7b, 0x44, 0x49, 0xa7, 0xcb, 0xb3, 0x7d, 0x00, 0x00, 0x00, 0xff, 0xff, 0x02,
0x10, 0x64, 0xec, 0x62, 0x01, 0x00, 0x00,
var file_groupcache_proto_rawDesc = []byte{
0x0a, 0x10, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x0c, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62,
0x22, 0x54, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14,
0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x02, 0x28, 0x09, 0x52, 0x05, 0x67,
0x72, 0x6f, 0x75, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x02, 0x28,
0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1e, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61,
0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x02, 0x28, 0x03, 0x52, 0x0a, 0x67, 0x65, 0x6e, 0x65,
0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x7a, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x6d,
0x69, 0x6e, 0x75, 0x74, 0x65, 0x5f, 0x71, 0x70, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52,
0x09, 0x6d, 0x69, 0x6e, 0x75, 0x74, 0x65, 0x51, 0x70, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x78,
0x70, 0x69, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x65, 0x78, 0x70, 0x69,
0x72, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x22, 0x82, 0x01, 0x0a, 0x0a, 0x53, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x02, 0x28, 0x09,
0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02,
0x20, 0x02, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12,
0x16, 0x0a, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52,
0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72,
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x67, 0x65, 0x6e,
0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x32, 0x4a, 0x0a, 0x0a, 0x47, 0x72, 0x6f, 0x75, 0x70,
0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x3c, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x18, 0x2e, 0x67,
0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61,
0x63, 0x68, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x00, 0x42, 0x2f, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x6d, 0x61, 0x69, 0x6c, 0x67, 0x75, 0x6e, 0x2f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63,
0x61, 0x63, 0x68, 0x65, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63,
0x68, 0x65, 0x70, 0x62,
}
var (
file_groupcache_proto_rawDescOnce sync.Once
file_groupcache_proto_rawDescData = file_groupcache_proto_rawDesc
)
func file_groupcache_proto_rawDescGZIP() []byte {
file_groupcache_proto_rawDescOnce.Do(func() {
file_groupcache_proto_rawDescData = protoimpl.X.CompressGZIP(file_groupcache_proto_rawDescData)
})
return file_groupcache_proto_rawDescData
}
var file_groupcache_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_groupcache_proto_goTypes = []interface{}{
(*GetRequest)(nil), // 0: groupcachepb.GetRequest
(*GetResponse)(nil), // 1: groupcachepb.GetResponse
(*SetRequest)(nil), // 2: groupcachepb.SetRequest
}
var file_groupcache_proto_depIdxs = []int32{
0, // 0: groupcachepb.GroupCache.Get:input_type -> groupcachepb.GetRequest
1, // 1: groupcachepb.GroupCache.Get:output_type -> groupcachepb.GetResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_groupcache_proto_init() }
func file_groupcache_proto_init() {
if File_groupcache_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_groupcache_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_groupcache_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_groupcache_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SetRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_groupcache_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_groupcache_proto_goTypes,
DependencyIndexes: file_groupcache_proto_depIdxs,
MessageInfos: file_groupcache_proto_msgTypes,
}.Build()
File_groupcache_proto = out.File
file_groupcache_proto_rawDesc = nil
file_groupcache_proto_goTypes = nil
file_groupcache_proto_depIdxs = nil
}

View File

@ -16,17 +16,21 @@ limitations under the License.
syntax = "proto2";
option go_package = "github.com/mailgun/groupcache/v2/groupcachepb";
package groupcachepb;
message GetRequest {
required string group = 1;
required string key = 2; // not actually required/guaranteed to be UTF-8
required int64 generation = 3;
}
message GetResponse {
optional bytes value = 1;
optional double minute_qps = 2;
optional int64 expire = 3;
optional int64 generation = 4;
}
message SetRequest {
@ -34,6 +38,7 @@ message SetRequest {
required string key = 2;
optional bytes value = 3;
optional int64 expire = 4;
optional int64 generation = 5;
}
service GroupCache {

57
http.go
View File

@ -165,12 +165,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
lenParts := len(parts)
if (lenParts != 2) || ((lenParts == 1) && (r.Method != http.MethodDelete)) {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]
// Fetch the value for this group/key.
group := GetGroup(groupName)
@ -187,6 +188,13 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
group.Stats.ServerRequests.Add(1)
if (lenParts == 1) && (r.Method == http.MethodDelete) {
group.localRemove("")
return
}
key := parts[1]
// Delete the key and return 200
if r.Method == http.MethodDelete {
group.localRemove(key)
@ -217,7 +225,12 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
expire = time.Unix(*out.Expire/int64(time.Second), *out.Expire%int64(time.Second))
}
group.localSet(*out.Key, out.Value, expire, &group.mainCache)
var generation int64
if out.Generation != nil {
generation = *out.Generation
}
group.localSet(*out.Key, out.Value, expire, generation, &group.mainCache)
return
}
@ -273,12 +286,21 @@ type request interface {
}
func (h *httpGetter) makeRequest(ctx context.Context, m string, in request, b io.Reader, out *http.Response) error {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.PathEscape(in.GetGroup()),
url.PathEscape(in.GetKey()),
)
var u string
if key := in.GetKey(); key != "" {
u = fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.PathEscape(in.GetGroup()),
url.PathEscape(key),
)
} else {
u = fmt.Sprintf(
"%v%v",
h.baseURL,
url.PathEscape(in.GetGroup()),
)
}
req, err := http.NewRequestWithContext(ctx, m, u, b)
if err != nil {
return err
@ -368,3 +390,20 @@ func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error {
}
return nil
}
func (h *httpGetter) Clear(ctx context.Context, in *pb.GetRequest) error {
var res http.Response
if err := h.makeRequest(ctx, http.MethodDelete, in, nil, &res); err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("while reading body response: %v", res.Status)
}
return fmt.Errorf("server returned status %d: %s", res.StatusCode, body)
}
return nil
}

View File

@ -187,21 +187,24 @@ func TestHTTPPool(t *testing.T) {
}
// Get a key that does not exist
err := g.Get(ctx, "IDoNotExist", StringSink(&value))
err := g.Get(ctx, "IReturnErrNotFound", StringSink(&value))
errNotFound := &ErrNotFound{}
if !errors.As(err, &errNotFound) {
t.Fatal(errors.New("expected error to be 'ErrNotFound'"))
}
assert.Equal(t, "I do not exist error", errNotFound.Error())
assert.Equal(t, "I am a ErrNotFound error", errNotFound.Error())
// Get a key that is guaranteed to return an error
err = g.Get(ctx, "IAlwaysReturnAnError", StringSink(&value))
// Get a key that is guaranteed to return a remote error.
err = g.Get(ctx, "IReturnErrRemoteCall", StringSink(&value))
errRemoteCall := &ErrRemoteCall{}
if !errors.As(err, &errRemoteCall) {
t.Fatal(errors.New("expected error to be 'ErrRemoteCall'"))
}
assert.Equal(t, "I am a ErrRemoteCall error", errRemoteCall.Error())
assert.Equal(t, "I am a GetterFunc error", errRemoteCall.Error())
// Get a key that is guaranteed to return an internal (500) error
err = g.Get(ctx, "IReturnInternalError", StringSink(&value))
assert.Equal(t, "I am a errors.New() error", err.Error())
}
@ -220,12 +223,16 @@ func beChildForTestHTTPPool(t *testing.T) {
p.Set(addrToURL(addrs)...)
getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error {
if key == "IDoNotExist" {
return &ErrNotFound{Msg: "I do not exist error"}
if key == "IReturnErrNotFound" {
return &ErrNotFound{Msg: "I am a ErrNotFound error"}
}
if key == "IAlwaysReturnAnError" {
return &ErrRemoteCall{Msg: "I am a GetterFunc error"}
if key == "IReturnErrRemoteCall" {
return &ErrRemoteCall{Msg: "I am a ErrRemoteCall error"}
}
if key == "IReturnInternalError" {
return errors.New("I am a errors.New() error")
}
if _, err := http.Get(*serverAddr); err != nil {

View File

@ -26,10 +26,6 @@ type NowFunc func() time.Time
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specifies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
@ -41,15 +37,19 @@ type Cache struct {
ll *list.List
cache map[interface{}]*list.Element
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct {
expire time.Time
key Key
value interface{}
expire time.Time
}
// New creates a new Cache.
@ -80,7 +80,7 @@ func (c *Cache) Add(key Key, value interface{}, expire time.Time) {
eee.value = value
return
}
ele := c.ll.PushFront(&entry{key, value, expire})
ele := c.ll.PushFront(&entry{expire, key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()

View File

@ -29,6 +29,7 @@ type ProtoGetter interface {
Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error
Remove(context context.Context, in *pb.GetRequest) error
Set(context context.Context, in *pb.SetRequest) error
Clear(context context.Context, in *pb.GetRequest) error
// GetURL returns the peer URL
GetURL() string
}
@ -50,9 +51,7 @@ type NoPeers struct{}
func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }
func (NoPeers) GetAll() []ProtoGetter { return []ProtoGetter{} }
var (
portPicker func(groupName string) PeerPicker
)
var portPicker func(groupName string) PeerPicker
// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.