This commit is contained in:
udhos 2023-12-28 04:11:26 +00:00 committed by GitHub
commit 07fc01e392
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 170 additions and 79 deletions

15
go.mod
View File

@ -1,19 +1,22 @@
module github.com/mailgun/groupcache/v2
go 1.19
go 1.21
require (
github.com/golang/protobuf v1.5.2
github.com/golang/protobuf v1.5.3
github.com/segmentio/fasthash v1.0.3
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/protobuf v1.28.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
golang.org/x/sys v0.14.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

40
go.sum
View File

@ -1,37 +1,45 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -68,21 +68,32 @@ func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error {
return f(ctx, key, dest)
}
var (
mu sync.RWMutex
groups = make(map[string]*Group)
initPeerServerOnce sync.Once
initPeerServer func()
)
// GetGroupWithWorkspace returns the named group previously created with NewGroup, or
// nil if there's no such group.
func GetGroupWithWorkspace(ws *workspace, name string) *Group {
ws.mu.RLock()
g := ws.groups[name]
ws.mu.RUnlock()
return g
}
// GetGroup returns the named group previously created with NewGroup, or
// nil if there's no such group.
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
return GetGroupWithWorkspace(DefaultWorkspace, name)
}
// NewGroupWithWorkspace creates a coordinated group-aware Getter from a Getter.
//
// The returned Getter tries (but does not guarantee) to run only one
// Get call at once for a given key across an entire set of peer
// processes. Concurrent callers both in the local process and in
// other processes receive copies of the answer once the original Get
// completes.
//
// The group name must be unique for each getter.
func NewGroupWithWorkspace(ws *workspace, name string, cacheBytes int64, getter Getter) *Group {
return newGroup(ws, name, cacheBytes, getter, nil)
}
// NewGroup creates a coordinated group-aware Getter from a Getter.
@ -95,28 +106,34 @@ func GetGroup(name string) *Group {
//
// The group name must be unique for each getter.
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
return newGroup(name, cacheBytes, getter, nil)
return newGroup(DefaultWorkspace, name, cacheBytes, getter, nil)
}
// DeregisterGroupWithWorkspace removes group from group pool
func DeregisterGroupWithWorkspace(ws *workspace, name string) {
ws.mu.Lock()
delete(ws.groups, name)
ws.mu.Unlock()
}
// DeregisterGroup removes group from group pool
func DeregisterGroup(name string) {
mu.Lock()
delete(groups, name)
mu.Unlock()
DeregisterGroupWithWorkspace(DefaultWorkspace, name)
}
// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
func newGroup(ws *workspace, name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
if getter == nil {
panic("nil Getter")
}
mu.Lock()
defer mu.Unlock()
initPeerServerOnce.Do(callInitPeerServer)
if _, dup := groups[name]; dup {
ws.mu.Lock()
defer ws.mu.Unlock()
ws.initPeerServerOnce.Do(func() { callInitPeerServer(ws) })
if _, dup := ws.groups[name]; dup {
panic("duplicate registration of group " + name)
}
g := &Group{
ws: ws,
name: name,
getter: getter,
peers: peers,
@ -125,43 +142,53 @@ func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *G
setGroup: &singleflight.Group{},
removeGroup: &singleflight.Group{},
}
if fn := newGroupHook; fn != nil {
if fn := ws.newGroupHook; fn != nil {
fn(g)
}
groups[name] = g
ws.groups[name] = g
return g
}
// newGroupHook, if non-nil, is called right after a new group is created.
var newGroupHook func(*Group)
// RegisterNewGroupHookWithWorkspace registers a hook that is run each time
// a group is created.
func RegisterNewGroupHookWithWorkspace(ws *workspace, fn func(*Group)) {
if ws.newGroupHook != nil {
panic("RegisterNewGroupHook called more than once")
}
ws.newGroupHook = fn
}
// RegisterNewGroupHook registers a hook that is run each time
// a group is created.
func RegisterNewGroupHook(fn func(*Group)) {
if newGroupHook != nil {
panic("RegisterNewGroupHook called more than once")
RegisterNewGroupHookWithWorkspace(DefaultWorkspace, fn)
}
// RegisterServerStartWithWorkspace registers a hook that is run when the first
// group is created.
func RegisterServerStartWithWorkspace(ws *workspace, fn func()) {
if ws.initPeerServer != nil {
panic("RegisterServerStart called more than once")
}
newGroupHook = fn
ws.initPeerServer = fn
}
// RegisterServerStart registers a hook that is run when the first
// group is created.
func RegisterServerStart(fn func()) {
if initPeerServer != nil {
panic("RegisterServerStart called more than once")
}
initPeerServer = fn
RegisterServerStartWithWorkspace(DefaultWorkspace, fn)
}
func callInitPeerServer() {
if initPeerServer != nil {
initPeerServer()
func callInitPeerServer(ws *workspace) {
if ws.initPeerServer != nil {
ws.initPeerServer()
}
}
// A Group is a cache namespace and associated data loaded spread over
// a group of 1 or more machines.
type Group struct {
ws *workspace
name string
getter Getter
peersOnce sync.Once
@ -232,7 +259,7 @@ func (g *Group) Name() string {
func (g *Group) initPeers() {
if g.peers == nil {
g.peers = getPeers(g.name)
g.peers = getPeers(g.ws, g.name)
}
}

View File

@ -310,7 +310,7 @@ func TestPeers(t *testing.T) {
localHits++
return dest.SetString("got:"+key, time.Time{})
}
testGroup := newGroup("TestPeers-group", cacheSize, GetterFunc(getter), peerList)
testGroup := newGroup(DefaultWorkspace, "TestPeers-group", cacheSize, GetterFunc(getter), peerList)
run := func(name string, n int, wantSummary string) {
// Reset counters
localHits = 0
@ -438,7 +438,7 @@ func (g *orderedFlightGroup) Lock(fn func()) {
func TestNoDedup(t *testing.T) {
const testkey = "testkey"
const testval = "testval"
g := newGroup("testgroup", 1024, GetterFunc(func(_ context.Context, key string, dest Sink) error {
g := newGroup(DefaultWorkspace, "testgroup", 1024, GetterFunc(func(_ context.Context, key string, dest Sink) error {
return dest.SetString(testval, time.Time{})
}), nil)
@ -522,7 +522,7 @@ func TestContextDeadlineOnPeer(t *testing.T) {
getter := func(_ context.Context, key string, dest Sink) error {
return dest.SetString("got:"+key, time.Time{})
}
testGroup := newGroup("TestContextDeadlineOnPeer-group", cacheSize, GetterFunc(getter), peerList)
testGroup := newGroup(DefaultWorkspace, "TestContextDeadlineOnPeer-group", cacheSize, GetterFunc(getter), peerList)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300)
defer cancel()

19
http.go
View File

@ -86,16 +86,14 @@ func NewHTTPPool(self string) *HTTPPool {
return p
}
var httpPoolMade bool
// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// NewHTTPPoolOptsWithWorkspace initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
if httpPoolMade {
func NewHTTPPoolOptsWithWorkspace(ws *workspace, self string, o *HTTPPoolOptions) *HTTPPool {
if ws.httpPoolMade {
panic("groupcache: NewHTTPPool must be called only once")
}
httpPoolMade = true
ws.httpPoolMade = true
p := &HTTPPool{
self: self,
@ -112,10 +110,17 @@ func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
}
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
RegisterPeerPicker(func() PeerPicker { return p })
RegisterPeerPickerWithWorkspace(ws, func() PeerPicker { return p })
return p
}
// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
return NewHTTPPoolOptsWithWorkspace(DefaultWorkspace, self, o)
}
// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".

View File

@ -50,19 +50,35 @@ type NoPeers struct{}
func (NoPeers) PickPeer(key string) (peer ProtoGetter, ok bool) { return }
func (NoPeers) GetAll() []ProtoGetter { return []ProtoGetter{} }
var (
portPicker func(groupName string) PeerPicker
)
// RegisterPeerPickerWithWorkspace registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPickerWithWorkspace(ws *workspace, fn func() PeerPicker) {
if ws.portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
ws.portPicker = func(_ string) PeerPicker { return fn() }
}
// RegisterPeerPicker registers the peer initialization function.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPeerPicker(fn func() PeerPicker) {
if portPicker != nil {
RegisterPeerPickerWithWorkspace(DefaultWorkspace, fn)
}
// RegisterPerGroupPeerPickerWithWorkspace registers the peer initialization function,
// which takes the groupName, to be used in choosing a PeerPicker.
// It is called once, when the first group is created.
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPerGroupPeerPickerWithWorkspace(ws *workspace, fn func(groupName string) PeerPicker) {
if ws.portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = func(_ string) PeerPicker { return fn() }
ws.portPicker = fn
}
// RegisterPerGroupPeerPicker registers the peer initialization function,
@ -71,17 +87,14 @@ func RegisterPeerPicker(fn func() PeerPicker) {
// Either RegisterPeerPicker or RegisterPerGroupPeerPicker should be
// called exactly once, but not both.
func RegisterPerGroupPeerPicker(fn func(groupName string) PeerPicker) {
if portPicker != nil {
panic("RegisterPeerPicker called more than once")
}
portPicker = fn
RegisterPerGroupPeerPickerWithWorkspace(DefaultWorkspace, fn)
}
func getPeers(groupName string) PeerPicker {
if portPicker == nil {
func getPeers(ws *workspace, groupName string) PeerPicker {
if ws.portPicker == nil {
return NoPeers{}
}
pk := portPicker(groupName)
pk := ws.portPicker(groupName)
if pk == nil {
pk = NoPeers{}
}

35
workspace.go Normal file
View File

@ -0,0 +1,35 @@
package groupcache
import "sync"
// workspace holds the "global" state for groupcache.
type workspace struct {
httpPoolMade bool
portPicker func(groupName string) PeerPicker
mu sync.RWMutex
groups map[string]*Group
initPeerServerOnce sync.Once
initPeerServer func()
// newGroupHook, if non-nil, is called right after a new group is created.
newGroupHook func(*Group)
}
// DefaultWorkspace is the default workspace used by non-workspace-aware APIs.
// If your application does not need to recreate groupcache resources,
// you should use the non-workspace-aware APIs.
// This is likely the most common case.
var DefaultWorkspace = NewWorkspace()
// NewWorkspace creates an explicit workspace for workspace-aware APIs.
// If your application needs to recreate groupcache resources at some
// point, you should use the workspace-aware APIs.
// In order to release current groupcache resources, your application
// would drop all references to the workspace.
func NewWorkspace() *workspace {
return &workspace{
groups: make(map[string]*Group),
}
}