This commit is contained in:
William Bergeron-Drouin 2024-04-16 18:38:11 +00:00 committed by GitHub
commit bec5815755
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 728 additions and 167 deletions

View File

@ -1,88 +1,170 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc v5.26.1
// source: example.proto
/*
Package groupcache_test is a generated protocol buffer package.
It is generated from these files:
example.proto
It has these top-level messages:
User
*/
package groupcache_test
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type User struct {
Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
Age int64 `protobuf:"varint,3,opt,name=age" json:"age,omitempty"`
IsSuper bool `protobuf:"varint,4,opt,name=is_super,json=isSuper" json:"is_super,omitempty"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
Age int64 `protobuf:"varint,3,opt,name=age,proto3" json:"age,omitempty"`
IsSuper bool `protobuf:"varint,4,opt,name=is_super,json=isSuper,proto3" json:"is_super,omitempty"`
}
func (m *User) Reset() { *m = User{} }
func (m *User) String() string { return proto.CompactTextString(m) }
func (*User) ProtoMessage() {}
func (*User) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (x *User) Reset() {
*x = User{}
if protoimpl.UnsafeEnabled {
mi := &file_example_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (m *User) GetId() string {
if m != nil {
return m.Id
func (x *User) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*User) ProtoMessage() {}
func (x *User) ProtoReflect() protoreflect.Message {
mi := &file_example_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use User.ProtoReflect.Descriptor instead.
func (*User) Descriptor() ([]byte, []int) {
return file_example_proto_rawDescGZIP(), []int{0}
}
func (x *User) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (m *User) GetName() string {
if m != nil {
return m.Name
func (x *User) GetName() string {
if x != nil {
return x.Name
}
return ""
}
func (m *User) GetAge() int64 {
if m != nil {
return m.Age
func (x *User) GetAge() int64 {
if x != nil {
return x.Age
}
return 0
}
func (m *User) GetIsSuper() bool {
if m != nil {
return m.IsSuper
func (x *User) GetIsSuper() bool {
if x != nil {
return x.IsSuper
}
return false
}
func init() {
proto.RegisterType((*User)(nil), "groupcachepb.User")
var File_example_proto protoreflect.FileDescriptor
var file_example_proto_rawDesc = []byte{
0x0a, 0x0d, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x0c, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, 0x22, 0x57, 0x0a,
0x04, 0x55, 0x73, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x67, 0x65,
0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x69,
0x73, 0x5f, 0x73, 0x75, 0x70, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x69,
0x73, 0x53, 0x75, 0x70, 0x65, 0x72, 0x42, 0x14, 0x5a, 0x12, 0x2e, 0x2f, 0x3b, 0x67, 0x72, 0x6f,
0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
func init() { proto.RegisterFile("example.proto", fileDescriptor0) }
var (
file_example_proto_rawDescOnce sync.Once
file_example_proto_rawDescData = file_example_proto_rawDesc
)
var fileDescriptor0 = []byte{
// 148 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xad, 0x48, 0xcc,
0x2d, 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0xca, 0x2f, 0x2d,
0x48, 0x4e, 0x4c, 0xce, 0x48, 0x2d, 0x48, 0x52, 0x0a, 0xe7, 0x62, 0x09, 0x2d, 0x4e, 0x2d, 0x12,
0xe2, 0xe3, 0x62, 0xca, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x62, 0xca, 0x4c, 0x11,
0x12, 0xe2, 0x62, 0xc9, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x02, 0x8b, 0x80, 0xd9, 0x42, 0x02, 0x5c,
0xcc, 0x89, 0xe9, 0xa9, 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x20, 0xa6, 0x90, 0x24, 0x17,
0x47, 0x66, 0x71, 0x7c, 0x71, 0x69, 0x41, 0x6a, 0x91, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10,
0x7b, 0x66, 0x71, 0x30, 0x88, 0xeb, 0x24, 0x18, 0xc5, 0x8f, 0xb0, 0x28, 0xbe, 0x24, 0xb5, 0xb8,
0x24, 0x89, 0x0d, 0xec, 0x00, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x26, 0x2e, 0x5f, 0x1a,
0x91, 0x00, 0x00, 0x00,
func file_example_proto_rawDescGZIP() []byte {
file_example_proto_rawDescOnce.Do(func() {
file_example_proto_rawDescData = protoimpl.X.CompressGZIP(file_example_proto_rawDescData)
})
return file_example_proto_rawDescData
}
var file_example_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_example_proto_goTypes = []interface{}{
(*User)(nil), // 0: groupcachepb.User
}
var file_example_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_example_proto_init() }
func file_example_proto_init() {
if File_example_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_example_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*User); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_example_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_example_proto_goTypes,
DependencyIndexes: file_example_proto_depIdxs,
MessageInfos: file_example_proto_msgTypes,
}.Build()
File_example_proto = out.File
file_example_proto_rawDesc = nil
file_example_proto_goTypes = nil
file_example_proto_depIdxs = nil
}

View File

@ -272,21 +272,78 @@ func (g *Group) Set(ctx context.Context, key string, value []byte, expire time.T
}
_, err := g.setGroup.Do(key, func() (interface{}, error) {
wg := sync.WaitGroup{}
errs := make(chan error)
// If remote peer owns this key
owner, ok := g.peers.PickPeer(key)
if ok {
if err := g.setFromPeer(ctx, owner, key, value, expire); err != nil {
if err := g.setFromPeer(ctx, owner, key, value, expire, false); err != nil {
return nil, err
}
// TODO(thrawn01): Not sure if this is useful outside of tests...
// maybe we should ALWAYS update the local cache?
if hotCache {
g.localSet(key, value, expire, &g.hotCache)
if !hotCache {
return nil, nil
}
return nil, nil
g.localSet(key, value, expire, &g.hotCache)
for _, peer := range g.peers.GetAll() {
if peer == owner {
// Avoid setting to owner a second time
continue
}
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.setFromPeer(ctx, peer, key, value, expire, true)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()
var err error
for e := range errs {
if e != nil {
err = errors.Join(err, e)
}
}
return nil, err
}
// We own this key
g.localSet(key, value, expire, &g.mainCache)
if hotCache {
// Also set to the hot cache of all peers
for _, peer := range g.peers.GetAll() {
wg.Add(1)
go func(peer ProtoGetter) {
errs <- g.setFromPeer(ctx, peer, key, value, expire, true)
wg.Done()
}(peer)
}
go func() {
wg.Wait()
close(errs)
}()
var err error
for e := range errs {
if e != nil {
err = errors.Join(err, e)
}
}
return nil, err
}
return nil, nil
})
return err
@ -329,11 +386,11 @@ func (g *Group) Remove(ctx context.Context, key string) error {
close(errs)
}()
// TODO(thrawn01): Should we report all errors? Reporting context
// cancelled error for each peer doesn't make much sense.
var err error
for e := range errs {
err = e
if e != nil {
err = errors.Join(err, e)
}
}
return nil, err
@ -473,7 +530,7 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (
return value, nil
}
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time) error {
func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v []byte, e time.Time, hotCache bool) error {
var expire int64
if !e.IsZero() {
expire = e.UnixNano()
@ -484,6 +541,11 @@ func (g *Group) setFromPeer(ctx context.Context, peer ProtoGetter, k string, v [
Key: &k,
Value: v,
}
if hotCache {
req.HotCache = &hotCache
}
return peer.Set(ctx, req)
}

View File

@ -1,6 +1,6 @@
syntax = "proto3";
option go_package = "groupcache_test";
option go_package = "./;groupcache_test";
package groupcachepb;

View File

@ -1,155 +1,354 @@
//
//Copyright 2012 Google Inc.
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
//http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc v5.26.1
// source: groupcache.proto
/*
Package groupcachepb is a generated protocol buffer package.
package __
It is generated from these files:
groupcache.proto
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
It has these top-level messages:
GetRequest
GetResponse
SetRequest
*/
package groupcachepb
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type GetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
XXX_unrecognized []byte `json:"-"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"` // not actually required/guaranteed to be UTF-8
}
func (m *GetRequest) Reset() { *m = GetRequest{} }
func (m *GetRequest) String() string { return proto.CompactTextString(m) }
func (*GetRequest) ProtoMessage() {}
func (*GetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (x *GetRequest) Reset() {
*x = GetRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_groupcache_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (m *GetRequest) GetGroup() string {
if m != nil && m.Group != nil {
return *m.Group
func (x *GetRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetRequest) ProtoMessage() {}
func (x *GetRequest) ProtoReflect() protoreflect.Message {
mi := &file_groupcache_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead.
func (*GetRequest) Descriptor() ([]byte, []int) {
return file_groupcache_proto_rawDescGZIP(), []int{0}
}
func (x *GetRequest) GetGroup() string {
if x != nil && x.Group != nil {
return *x.Group
}
return ""
}
func (m *GetRequest) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
func (x *GetRequest) GetKey() string {
if x != nil && x.Key != nil {
return *x.Key
}
return ""
}
type GetResponse struct {
Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps" json:"minute_qps,omitempty"`
Expire *int64 `protobuf:"varint,3,opt,name=expire" json:"expire,omitempty"`
XXX_unrecognized []byte `json:"-"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps" json:"minute_qps,omitempty"`
Expire *int64 `protobuf:"varint,3,opt,name=expire" json:"expire,omitempty"`
}
func (m *GetResponse) Reset() { *m = GetResponse{} }
func (m *GetResponse) String() string { return proto.CompactTextString(m) }
func (*GetResponse) ProtoMessage() {}
func (*GetResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (x *GetResponse) Reset() {
*x = GetResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_groupcache_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (m *GetResponse) GetValue() []byte {
if m != nil {
return m.Value
func (x *GetResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetResponse) ProtoMessage() {}
func (x *GetResponse) ProtoReflect() protoreflect.Message {
mi := &file_groupcache_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead.
func (*GetResponse) Descriptor() ([]byte, []int) {
return file_groupcache_proto_rawDescGZIP(), []int{1}
}
func (x *GetResponse) GetValue() []byte {
if x != nil {
return x.Value
}
return nil
}
func (m *GetResponse) GetMinuteQps() float64 {
if m != nil && m.MinuteQps != nil {
return *m.MinuteQps
func (x *GetResponse) GetMinuteQps() float64 {
if x != nil && x.MinuteQps != nil {
return *x.MinuteQps
}
return 0
}
func (m *GetResponse) GetExpire() int64 {
if m != nil && m.Expire != nil {
return *m.Expire
func (x *GetResponse) GetExpire() int64 {
if x != nil && x.Expire != nil {
return *x.Expire
}
return 0
}
type SetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Expire *int64 `protobuf:"varint,4,opt,name=expire" json:"expire,omitempty"`
XXX_unrecognized []byte `json:"-"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
Expire *int64 `protobuf:"varint,4,opt,name=expire" json:"expire,omitempty"`
HotCache *bool `protobuf:"varint,5,opt,name=hotCache" json:"hotCache,omitempty"`
}
func (m *SetRequest) Reset() { *m = SetRequest{} }
func (m *SetRequest) String() string { return proto.CompactTextString(m) }
func (*SetRequest) ProtoMessage() {}
func (*SetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (x *SetRequest) Reset() {
*x = SetRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_groupcache_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (m *SetRequest) GetGroup() string {
if m != nil && m.Group != nil {
return *m.Group
func (x *SetRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SetRequest) ProtoMessage() {}
func (x *SetRequest) ProtoReflect() protoreflect.Message {
mi := &file_groupcache_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SetRequest.ProtoReflect.Descriptor instead.
func (*SetRequest) Descriptor() ([]byte, []int) {
return file_groupcache_proto_rawDescGZIP(), []int{2}
}
func (x *SetRequest) GetGroup() string {
if x != nil && x.Group != nil {
return *x.Group
}
return ""
}
func (m *SetRequest) GetKey() string {
if m != nil && m.Key != nil {
return *m.Key
func (x *SetRequest) GetKey() string {
if x != nil && x.Key != nil {
return *x.Key
}
return ""
}
func (m *SetRequest) GetValue() []byte {
if m != nil {
return m.Value
func (x *SetRequest) GetValue() []byte {
if x != nil {
return x.Value
}
return nil
}
func (m *SetRequest) GetExpire() int64 {
if m != nil && m.Expire != nil {
return *m.Expire
func (x *SetRequest) GetExpire() int64 {
if x != nil && x.Expire != nil {
return *x.Expire
}
return 0
}
func init() {
proto.RegisterType((*GetRequest)(nil), "groupcachepb.GetRequest")
proto.RegisterType((*GetResponse)(nil), "groupcachepb.GetResponse")
proto.RegisterType((*SetRequest)(nil), "groupcachepb.SetRequest")
func (x *SetRequest) GetHotCache() bool {
if x != nil && x.HotCache != nil {
return *x.HotCache
}
return false
}
func init() { proto.RegisterFile("groupcache.proto", fileDescriptor0) }
var File_groupcache_proto protoreflect.FileDescriptor
var fileDescriptor0 = []byte{
// 215 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0x31, 0x4b, 0xc5, 0x30,
0x18, 0x34, 0x8d, 0x0a, 0xfd, 0xec, 0x50, 0x82, 0x48, 0x14, 0x84, 0x90, 0x29, 0x53, 0x07, 0x71,
0x74, 0x73, 0x28, 0xb8, 0x19, 0x37, 0x17, 0x69, 0xcb, 0x87, 0x16, 0xb5, 0x49, 0x9b, 0x44, 0x7c,
0xff, 0xfe, 0x91, 0xe6, 0x41, 0x3a, 0xbc, 0xe5, 0x6d, 0xb9, 0x3b, 0x2e, 0x77, 0xdf, 0x41, 0xfd,
0xb9, 0x98, 0x60, 0x87, 0x6e, 0xf8, 0xc2, 0xc6, 0x2e, 0xc6, 0x1b, 0x56, 0x65, 0xc6, 0xf6, 0xf2,
0x11, 0xa0, 0x45, 0xaf, 0x71, 0x0e, 0xe8, 0x3c, 0xbb, 0x86, 0x8b, 0x55, 0xe5, 0x44, 0x14, 0xaa,
0xd4, 0x09, 0xb0, 0x1a, 0xe8, 0x37, 0xee, 0x78, 0xb1, 0x72, 0xf1, 0x29, 0xdf, 0xe1, 0x6a, 0x75,
0x39, 0x6b, 0x26, 0x87, 0xd1, 0xf6, 0xd7, 0xfd, 0x04, 0xe4, 0x44, 0x10, 0x55, 0xe9, 0x04, 0xd8,
0x3d, 0xc0, 0xef, 0x38, 0x05, 0x8f, 0x1f, 0xb3, 0x75, 0xbc, 0x10, 0x44, 0x11, 0x5d, 0x26, 0xe6,
0xd5, 0x3a, 0x76, 0x03, 0x97, 0xf8, 0x6f, 0xc7, 0x05, 0x39, 0x15, 0x44, 0x51, 0x7d, 0x40, 0xb2,
0x07, 0x78, 0x3b, 0xb9, 0x51, 0xae, 0x40, 0xb7, 0x15, 0x72, 0xc6, 0xf9, 0x36, 0xe3, 0xe1, 0x05,
0xa0, 0x8d, 0x1f, 0x3d, 0xc7, 0x15, 0xd8, 0x13, 0xd0, 0x16, 0x3d, 0xe3, 0xcd, 0x76, 0x99, 0x26,
0xcf, 0x72, 0x77, 0x7b, 0x44, 0x49, 0xa7, 0xcb, 0xb3, 0x7d, 0x00, 0x00, 0x00, 0xff, 0xff, 0x02,
0x10, 0x64, 0xec, 0x62, 0x01, 0x00, 0x00,
var file_groupcache_proto_rawDesc = []byte{
0x0a, 0x10, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x0c, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62,
0x22, 0x34, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14,
0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x02, 0x28, 0x09, 0x52, 0x05, 0x67,
0x72, 0x6f, 0x75, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x02, 0x28,
0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x5a, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x6d,
0x69, 0x6e, 0x75, 0x74, 0x65, 0x5f, 0x71, 0x70, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52,
0x09, 0x6d, 0x69, 0x6e, 0x75, 0x74, 0x65, 0x51, 0x70, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x78,
0x70, 0x69, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x65, 0x78, 0x70, 0x69,
0x72, 0x65, 0x22, 0x7e, 0x0a, 0x0a, 0x53, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x02, 0x28, 0x09, 0x52,
0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20,
0x02, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16,
0x0a, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06,
0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x74, 0x43, 0x61, 0x63,
0x68, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x68, 0x6f, 0x74, 0x43, 0x61, 0x63,
0x68, 0x65, 0x32, 0x4a, 0x0a, 0x0a, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x43, 0x61, 0x63, 0x68, 0x65,
0x12, 0x3c, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x18, 0x2e, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63,
0x61, 0x63, 0x68, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x19, 0x2e, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62,
0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x04,
0x5a, 0x02, 0x2e, 0x2f,
}
var (
file_groupcache_proto_rawDescOnce sync.Once
file_groupcache_proto_rawDescData = file_groupcache_proto_rawDesc
)
func file_groupcache_proto_rawDescGZIP() []byte {
file_groupcache_proto_rawDescOnce.Do(func() {
file_groupcache_proto_rawDescData = protoimpl.X.CompressGZIP(file_groupcache_proto_rawDescData)
})
return file_groupcache_proto_rawDescData
}
var file_groupcache_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_groupcache_proto_goTypes = []interface{}{
(*GetRequest)(nil), // 0: groupcachepb.GetRequest
(*GetResponse)(nil), // 1: groupcachepb.GetResponse
(*SetRequest)(nil), // 2: groupcachepb.SetRequest
}
var file_groupcache_proto_depIdxs = []int32{
0, // 0: groupcachepb.GroupCache.Get:input_type -> groupcachepb.GetRequest
1, // 1: groupcachepb.GroupCache.Get:output_type -> groupcachepb.GetResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_groupcache_proto_init() }
func file_groupcache_proto_init() {
if File_groupcache_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_groupcache_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_groupcache_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*GetResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_groupcache_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SetRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_groupcache_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_groupcache_proto_goTypes,
DependencyIndexes: file_groupcache_proto_depIdxs,
MessageInfos: file_groupcache_proto_msgTypes,
}.Build()
File_groupcache_proto = out.File
file_groupcache_proto_rawDesc = nil
file_groupcache_proto_goTypes = nil
file_groupcache_proto_depIdxs = nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
syntax = "proto2";
package groupcachepb;
option go_package = "./";
message GetRequest {
required string group = 1;
@ -34,6 +35,7 @@ message SetRequest {
required string key = 2;
optional bytes value = 3;
optional int64 expire = 4;
optional bool hotCache = 5;
}
service GroupCache {

View File

@ -217,7 +217,12 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
expire = time.Unix(*out.Expire/int64(time.Second), *out.Expire%int64(time.Second))
}
group.localSet(*out.Key, out.Value, expire, &group.mainCache)
c := &group.mainCache
if out.HotCache != nil && *out.HotCache {
c = &group.hotCache
}
group.localSet(*out.Key, out.Value, expire, c)
return
}

View File

@ -95,7 +95,9 @@ func TestHTTPPool(t *testing.T) {
wg.Wait()
// Use a dummy self address so that we don't handle gets in-process.
p := NewHTTPPool("should-be-ignored")
p, mux := newTestHTTPPool("should-be-ignored")
defer mux.Close()
p.Set(addrToURL(childAddr)...)
// Dummy getter function. Gets should go to children only.
@ -219,7 +221,8 @@ func testKeys(n int) (keys []string) {
func beChildForTestHTTPPool(t *testing.T) {
addrs := strings.Split(*peerAddrs, ",")
p := NewHTTPPool("http://" + addrs[*peerIndex])
p, mux := newTestHTTPPool("http://" + addrs[*peerIndex])
defer mux.Close()
p.Set(addrToURL(addrs)...)
getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error {
@ -286,3 +289,45 @@ func awaitAddrReady(t *testing.T, addr string, wg *sync.WaitGroup) {
time.Sleep(delay)
}
}
type serveMux struct {
mux *http.ServeMux
handlers map[string]http.Handler
}
func newTestHTTPPool(self string) (*HTTPPool, *serveMux) {
httpPoolMade, portPicker = false, nil // Testing only
p := NewHTTPPoolOpts(self, nil)
sm := &serveMux{
mux: http.NewServeMux(),
handlers: make(map[string]http.Handler),
}
sm.handlers[p.opts.BasePath] = p
return p, sm
}
func (s *serveMux) Handle(pattern string, handler http.Handler) {
s.handlers[pattern] = handler
s.mux.Handle(pattern, handler)
}
func (s *serveMux) Close() {
for pattern := range s.handlers {
delete(s.handlers, pattern)
}
}
func (s *serveMux) RemoveHandle(pattern string) {
delete(s.handlers, pattern)
}
func (s *serveMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if _, ok := s.handlers[r.URL.Path]; ok {
s.mux.ServeHTTP(w, r)
} else {
http.NotFound(w, r)
}
}

166
integration_test.go Normal file
View File

@ -0,0 +1,166 @@
package groupcache
import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestManualSet(t *testing.T) {
if *peerChild {
beChildForIntegrationTest(t)
os.Exit(0)
}
const (
nChild = 4
nGets = 100
)
var serverHits int
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello")
serverHits++
}))
defer ts.Close()
var childAddr []string
for i := 0; i < nChild; i++ {
childAddr = append(childAddr, pickFreeAddr(t))
}
var cmds []*exec.Cmd
var wg sync.WaitGroup
for i := 0; i < nChild; i++ {
cmd := exec.Command(os.Args[0],
"--test.run=TestManualSet",
"--test_peer_child",
"--test_peer_addrs="+strings.Join(childAddr, ","),
"--test_peer_index="+strconv.Itoa(i),
"--test_server_addr="+ts.URL,
)
cmds = append(cmds, cmd)
cmd.Stdout = os.Stdout
wg.Add(1)
if err := cmd.Start(); err != nil {
t.Fatal("failed to start child process: ", err)
}
go awaitAddrReady(t, childAddr[i], &wg)
}
defer func() {
for i := 0; i < nChild; i++ {
if cmds[i].Process != nil {
cmds[i].Process.Kill()
}
}
}()
wg.Wait()
// Use a dummy self address so that we don't handle gets in-process.
p, mux := newTestHTTPPool("should-be-ignored")
defer mux.Close()
p.Set(addrToURL(childAddr)...)
// Dummy getter function. Gets should go to children only.
// The only time this process will handle a get is when the
// children can't be contacted for some reason.
getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error {
return errors.New("parent getter called; something's wrong")
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
assertLocalGets := func(key, expectedValue string) {
for _, addr := range childAddr {
resp, err := http.Get("http://" + addr + "/local/" + key)
assert.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
bytes, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, expectedValue, string(bytes))
}
}
g := NewGroup("integrationTest", 1<<20, getter)
var got string
err := g.Get(ctx, "key-0", StringSink(&got))
assert.NoError(t, err)
assert.Equal(t, "got:key-0", got)
// Since nodes have hot caches, we assert that the localGets are returning the right data
assertLocalGets("key-0", "got:key-0")
// Manually set the value in the cache
overwrite := "manual-set"
err = g.Set(ctx, "key-0", []byte(overwrite), time.Time{}, true)
assert.NoError(t, err)
err = g.Get(ctx, "key-0", StringSink(&got))
assert.NoError(t, err)
assert.Equal(t, overwrite, got)
assertLocalGets("key-0", overwrite)
}
type overwriteHttpPool struct {
g *Group
p *HTTPPool
}
// ServeHTTP implements http.Handler.
func (o overwriteHttpPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/local/") {
fmt.Printf("peer %d group.Get(%s)\n", *peerIndex, r.URL.Path)
key := strings.TrimPrefix(r.URL.Path, "/local/")
// Custom logic here
// For example, you can write the key to the response
var got string
err := o.g.Get(r.Context(), key, StringSink(&got))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, _ = w.Write([]byte(got))
return
}
// Call the original handler
o.p.ServeHTTP(w, r)
}
var _ http.Handler = (*overwriteHttpPool)(nil)
func beChildForIntegrationTest(t *testing.T) {
addrs := strings.Split(*peerAddrs, ",")
p, mux := newTestHTTPPool("http://" + addrs[*peerIndex])
defer mux.Close()
hp := overwriteHttpPool{
p: p,
}
hp.p.Set(addrToURL(addrs)...)
getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error {
return dest.SetString("got:"+key, time.Time{})
})
hp.g = NewGroup("integrationTest", 1<<20, getter)
log.Printf("Listening on %s\n", addrs[*peerIndex])
log.Fatal(http.ListenAndServe(addrs[*peerIndex], hp))
}

View File

@ -13,4 +13,4 @@ protoc -I=$PROTO_DIR \
protoc -I=$PROTO_DIR \
--go_out=. \
$PROTO_DIR/example.proto
$PROTO_DIR/example.proto && mv ./example.pb.go ./example_pb_test.go