From a5eee224aa1667240dd11db5d490bbacbe26845d Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 30 May 2019 16:47:38 -0500 Subject: [PATCH 1/4] http requests now respect context.Context done --- http.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/http.go b/http.go index d45955e..64f989f 100644 --- a/http.go +++ b/http.go @@ -18,6 +18,7 @@ package groupcache import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -223,7 +224,7 @@ var bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } -func (h *httpGetter) makeRequest(context Context, method string, in *pb.GetRequest, out *http.Response) error { +func (h *httpGetter) makeRequest(ctx Context, method string, in *pb.GetRequest, out *http.Response) error { u := fmt.Sprintf( "%v%v/%v", h.baseURL, @@ -234,9 +235,15 @@ func (h *httpGetter) makeRequest(context Context, method string, in *pb.GetReque if err != nil { return err } + + // If user passed a standard context object, use it in the request. + if stdCtx, ok := ctx.(context.Context); ok { + req = req.WithContext(stdCtx) + } + tr := http.DefaultTransport if h.transport != nil { - tr = h.transport(context) + tr = h.transport(ctx) } res, err := tr.RoundTrip(req) if err != nil { From f352930de51ec0666af954946293cfef32a04d97 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 30 May 2019 17:03:06 -0500 Subject: [PATCH 2/4] Now using context.Context instead of groupcache.Context --- groupcache.go | 19 ++++++++++--------- groupcache_test.go | 17 +++++++++-------- http.go | 20 +++++++++----------- http_test.go | 5 +++-- peers.go | 10 +++------- 5 files changed, 34 insertions(+), 37 deletions(-) diff --git a/groupcache.go b/groupcache.go index e555eb3..e70981f 100644 --- a/groupcache.go +++ b/groupcache.go @@ -25,6 +25,7 @@ limitations under the License. package groupcache import ( + "context" "errors" "math/rand" "strconv" @@ -45,13 +46,13 @@ type Getter interface { // uniquely describe the loaded data, without an implicit // current time, and without relying on cache expiration // mechanisms. - Get(ctx Context, key string, dest Sink) error + Get(ctx context.Context, key string, dest Sink) error } // A GetterFunc implements Getter with a function. -type GetterFunc func(ctx Context, key string, dest Sink) error +type GetterFunc func(ctx context.Context, key string, dest Sink) error -func (f GetterFunc) Get(ctx Context, key string, dest Sink) error { +func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error { return f(ctx, key, dest) } @@ -210,7 +211,7 @@ func (g *Group) initPeers() { } } -func (g *Group) Get(ctx Context, key string, dest Sink) error { +func (g *Group) Get(ctx context.Context, key string, dest Sink) error { g.peersOnce.Do(g.initPeers) g.Stats.Gets.Add(1) if dest == nil { @@ -240,7 +241,7 @@ func (g *Group) Get(ctx Context, key string, dest Sink) error { // Remove clears the key from our cache then forwards the remove // request to all peers. -func (g *Group) Remove(ctx Context, key string) error { +func (g *Group) Remove(ctx context.Context, key string) error { g.peersOnce.Do(g.initPeers) _, err := g.removeGroup.Do(key, func() (interface{}, error) { @@ -288,7 +289,7 @@ func (g *Group) Remove(ctx Context, key string) error { } // load loads key either by invoking the getter locally or by sending it to another machine. -func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { +func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { // Check the cache again because singleflight can only dedup calls @@ -347,7 +348,7 @@ func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPo return } -func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) { +func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) { err := g.getter.Get(ctx, key, dest) if err != nil { return ByteView{}, err @@ -355,7 +356,7 @@ func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) return dest.view() } -func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) { +func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) { req := &pb.GetRequest{ Group: &g.name, Key: &key, @@ -384,7 +385,7 @@ func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView return value, nil } -func (g *Group) removeFromPeer(ctx Context, peer ProtoGetter, key string) error { +func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error { req := &pb.GetRequest{ Group: &g.name, Key: &key, diff --git a/groupcache_test.go b/groupcache_test.go index da24f28..ad26250 100644 --- a/groupcache_test.go +++ b/groupcache_test.go @@ -19,6 +19,7 @@ limitations under the License. package groupcache import ( + "context" "errors" "fmt" "hash/crc32" @@ -41,7 +42,7 @@ var ( stringc = make(chan string) - dummyCtx Context + dummyCtx context.Context // cacheFills is the number of times stringGroup or // protoGroup's Getter have been called. Read using the @@ -59,7 +60,7 @@ const ( ) func testSetup() { - stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error { + stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error { if key == fromChan { key = <-stringc } @@ -67,7 +68,7 @@ func testSetup() { return dest.SetString("ECHO:"+key, time.Time{}) })) - protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error { + protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error { if key == fromChan { key = <-stringc } @@ -78,7 +79,7 @@ func testSetup() { }, time.Time{}) })) - expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error { + expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error { cacheFills.Add(1) return dest.SetString("ECHO:"+key, time.Now().Add(time.Millisecond*100)) })) @@ -254,7 +255,7 @@ type fakePeer struct { fail bool } -func (p *fakePeer) Get(_ Context, in *pb.GetRequest, out *pb.GetResponse) error { +func (p *fakePeer) Get(_ context.Context, in *pb.GetRequest, out *pb.GetResponse) error { p.hits++ if p.fail { return errors.New("simulated error from peer") @@ -263,7 +264,7 @@ func (p *fakePeer) Get(_ Context, in *pb.GetRequest, out *pb.GetResponse) error return nil } -func (p *fakePeer) Remove(_ Context, in *pb.GetRequest) error { +func (p *fakePeer) Remove(_ context.Context, in *pb.GetRequest) error { p.hits++ if p.fail { return errors.New("simulated error from peer") @@ -295,7 +296,7 @@ func TestPeers(t *testing.T) { peerList := fakePeers([]ProtoGetter{peer0, peer1, peer2, nil}) const cacheSize = 0 // disabled localHits := 0 - getter := func(_ Context, key string, dest Sink) error { + getter := func(_ context.Context, key string, dest Sink) error { localHits++ return dest.SetString("got:"+key, time.Time{}) } @@ -427,7 +428,7 @@ func (g *orderedFlightGroup) Lock(fn func()) { func TestNoDedup(t *testing.T) { const testkey = "testkey" const testval = "testval" - g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error { + g := newGroup("testgroup", 1024, GetterFunc(func(_ context.Context, key string, dest Sink) error { return dest.SetString(testval, time.Time{}) }), nil) diff --git a/http.go b/http.go index 64f989f..d84d5ae 100644 --- a/http.go +++ b/http.go @@ -41,12 +41,12 @@ type HTTPPool struct { // Context optionally specifies a context for the server to use when it // receives a request. // If nil, the server uses a nil Context. - Context func(*http.Request) Context + Context func(*http.Request) context.Context // Transport optionally specifies an http.RoundTripper for the client // to use when it makes a request. // If nil, the client uses http.DefaultTransport. - Transport func(Context) http.RoundTripper + Transport func(context.Context) http.RoundTripper // this peer's base URL, e.g. "https://example.net:8000" self string @@ -173,7 +173,7 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "no such group: "+groupName, http.StatusNotFound) return } - var ctx Context + var ctx context.Context if p.Context != nil { ctx = p.Context(r) } @@ -216,7 +216,7 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type httpGetter struct { - transport func(Context) http.RoundTripper + transport func(context.Context) http.RoundTripper baseURL string } @@ -224,7 +224,7 @@ var bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } -func (h *httpGetter) makeRequest(ctx Context, method string, in *pb.GetRequest, out *http.Response) error { +func (h *httpGetter) makeRequest(ctx context.Context, method string, in *pb.GetRequest, out *http.Response) error { u := fmt.Sprintf( "%v%v/%v", h.baseURL, @@ -236,10 +236,8 @@ func (h *httpGetter) makeRequest(ctx Context, method string, in *pb.GetRequest, return err } - // If user passed a standard context object, use it in the request. - if stdCtx, ok := ctx.(context.Context); ok { - req = req.WithContext(stdCtx) - } + // Pass along the context to the RoundTripper + req = req.WithContext(ctx) tr := http.DefaultTransport if h.transport != nil { @@ -253,7 +251,7 @@ func (h *httpGetter) makeRequest(ctx Context, method string, in *pb.GetRequest, return nil } -func (h *httpGetter) Get(ctx Context, in *pb.GetRequest, out *pb.GetResponse) error { +func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error { var res http.Response if err := h.makeRequest(ctx, http.MethodGet, in, &res); err != nil { return err @@ -276,7 +274,7 @@ func (h *httpGetter) Get(ctx Context, in *pb.GetRequest, out *pb.GetResponse) er return nil } -func (h *httpGetter) Remove(ctx Context, in *pb.GetRequest) error { +func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error { var res http.Response if err := h.makeRequest(ctx, http.MethodDelete, in, &res); err != nil { return err diff --git a/http_test.go b/http_test.go index 0453999..c2764a3 100644 --- a/http_test.go +++ b/http_test.go @@ -17,6 +17,7 @@ limitations under the License. package groupcache import ( + "context" "errors" "flag" "fmt" @@ -96,7 +97,7 @@ func TestHTTPPool(t *testing.T) { // Dummy getter function. Gets should go to children only. // The only time this process will handle a get is when the // children can't be contacted for some reason. - getter := GetterFunc(func(ctx Context, key string, dest Sink) error { + getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { return errors.New("parent getter called; something's wrong") }) g := NewGroup("httpPoolTest", 1<<20, getter) @@ -162,7 +163,7 @@ func beChildForTestHTTPPool(t *testing.T) { p := NewHTTPPool("http://" + addrs[*peerIndex]) p.Set(addrToURL(addrs)...) - getter := GetterFunc(func(ctx Context, key string, dest Sink) error { + getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { if _, err := http.Get(*serverAddr); err != nil { t.Logf("HTTP request from getter failed with '%s'", err) } diff --git a/peers.go b/peers.go index 4bf9e62..05847da 100644 --- a/peers.go +++ b/peers.go @@ -19,18 +19,14 @@ limitations under the License. package groupcache import ( + "context" pb "github.com/mailgun/groupcache/groupcachepb" ) -// Context is an opaque value passed through calls to the -// ProtoGetter. It may be nil if your ProtoGetter implementation does -// not require a context. -type Context interface{} - // ProtoGetter is the interface that must be implemented by a peer. type ProtoGetter interface { - Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error - Remove(context Context, in *pb.GetRequest) error + Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error + Remove(context context.Context, in *pb.GetRequest) error } // PeerPicker is the interface that must be implemented to locate From 9a873a72e55944e6451eeef1c502933160dfa479 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Thu, 30 May 2019 17:14:47 -0500 Subject: [PATCH 3/4] Added a CHANGELOG file --- CHANGELOG | 23 +++++++++++++++++++++++ http_test.go | 12 ++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) create mode 100644 CHANGELOG diff --git a/CHANGELOG b/CHANGELOG new file mode 100644 index 0000000..6ebe50f --- /dev/null +++ b/CHANGELOG @@ -0,0 +1,23 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [2.0.0] - 2019-05-30 +### Changes +* Now using golang standard `context.Context` instead of groupcache.Context. +* http requests now respect `context.Context` done. + +## [1.3.0] - 2019-05-23 +### Added +* Added `Remove()` method to `Group` to purge a key from the group. + +## [1.1.0] - 2019-04-10 +### Added +* Sinks can now accept an expire time +* Changed import path to mailgun/groupcache + +## [hash 5b532d6fd5efaf7fa130d4e859a2fde0fc3a9e1b] - 2019-01-29 +### Changes +* Initial import from https://github.com/golang/groupcache diff --git a/http_test.go b/http_test.go index c2764a3..d3b40d4 100644 --- a/http_test.go +++ b/http_test.go @@ -100,11 +100,15 @@ func TestHTTPPool(t *testing.T) { getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { return errors.New("parent getter called; something's wrong") }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + g := NewGroup("httpPoolTest", 1<<20, getter) for _, key := range testKeys(nGets) { var value string - if err := g.Get(nil, key, StringSink(&value)); err != nil { + if err := g.Get(ctx, key, StringSink(&value)); err != nil { t.Fatal(err) } if suffix := ":" + key; !strings.HasSuffix(value, suffix) { @@ -123,7 +127,7 @@ func TestHTTPPool(t *testing.T) { // Multiple gets on the same key for i := 0; i < 2; i++ { - if err := g.Get(nil, key, StringSink(&value)); err != nil { + if err := g.Get(ctx, key, StringSink(&value)); err != nil { t.Fatal(err) } } @@ -134,12 +138,12 @@ func TestHTTPPool(t *testing.T) { } // Remove the key from the cache and we should see another server hit - if err := g.Remove(nil, key); err != nil { + if err := g.Remove(ctx, key); err != nil { t.Fatal(err) } // Get the key again - if err := g.Get(nil, key, StringSink(&value)); err != nil { + if err := g.Get(ctx, key, StringSink(&value)); err != nil { t.Fatal(err) } From 054f2178b4e2c9a620e9e0f46d06dabceb216ec4 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 31 May 2019 17:41:07 -0500 Subject: [PATCH 4/4] Client keep-alives and hotcache changes * Now Associating the transport with peer `httpGetter` so we take advantage of connection reuse. This lowers the impact on DNS and improves preformance for high request volume low latency applications. * Now always populating the hotcache. A more complex algorithm is unnecessary when the LRU cache will ensure the most used values remain in the cache. The evict code ensures the hotcache does not over crowd the maincache. --- .travis.yml | 2 +- CHANGELOG | 11 +++++++++-- groupcache.go | 10 +++------- groupcache_test.go | 10 ++++------ http.go | 48 ++++++++++++++++++++++++++++------------------ 5 files changed, 46 insertions(+), 35 deletions(-) diff --git a/.travis.yml b/.travis.yml index e4fa39d..1db9513 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,9 +9,9 @@ script: - go test ./... go: - - 1.9.x - 1.10.x - 1.11.x + - 1.12.x - master cache: diff --git a/CHANGELOG b/CHANGELOG index 6ebe50f..175ceda 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -6,8 +6,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [2.0.0] - 2019-05-30 ### Changes -* Now using golang standard `context.Context` instead of groupcache.Context. -* http requests now respect `context.Context` done. +* Now using golang standard `context.Context` instead of `groupcache.Context`. +* HTTP requests made by `httpGetter` now respect `context.Context` done. +* Moved `HTTPPool` config `Context` and `Transport` to `HTTPPoolOptions` for consist configuration. +* Now Associating the transport with peer `httpGetter` so we take advantage of + connection reuse. This lowers the impact on DNS and improves performance for + high request volume low latency applications. +* Now always populating the hotcache. A more complex algorithm is unnecessary + when the LRU cache will ensure the most used values remain in the cache. The + evict code ensures the hotcache does not overcrowd the maincache. ## [1.3.0] - 2019-05-23 ### Added diff --git a/groupcache.go b/groupcache.go index e70981f..a45661c 100644 --- a/groupcache.go +++ b/groupcache.go @@ -27,7 +27,6 @@ package groupcache import ( "context" "errors" - "math/rand" "strconv" "sync" "sync/atomic" @@ -376,12 +375,9 @@ func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) ( } value := ByteView{b: res.Value, e: expire} - // TODO(bradfitz): use res.MinuteQps or something smart to - // conditionally populate hotCache. For now just do it some - // percentage of the time. - if rand.Intn(10) == 0 { - g.populateCache(key, value, &g.hotCache) - } + + // Always populate the hot cache + g.populateCache(key, value, &g.hotCache) return value, nil } diff --git a/groupcache_test.go b/groupcache_test.go index ad26250..77402d7 100644 --- a/groupcache_test.go +++ b/groupcache_test.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "hash/crc32" - "math/rand" "reflect" "sync" "testing" @@ -33,7 +32,7 @@ import ( "github.com/golang/protobuf/proto" pb "github.com/mailgun/groupcache/groupcachepb" - testpb "github.com/mailgun/groupcache/testpb" + "github.com/mailgun/groupcache/testpb" ) var ( @@ -289,7 +288,6 @@ func (p fakePeers) GetAll() []ProtoGetter { // tests that peers (virtual, in-process) are hit, and how much. func TestPeers(t *testing.T) { once.Do(testSetup) - rand.Seed(123) peer0 := &fakePeer{} peer1 := &fakePeer{} peer2 := &fakePeer{} @@ -339,9 +337,9 @@ func TestPeers(t *testing.T) { resetCacheSize(1 << 20) run("base", 200, "localHits = 49, peers = 51 49 51") - // Verify cache was hit. All localHits are gone, and some of - // the peer hits (the ones randomly selected to be maybe hot) - run("cached_base", 200, "localHits = 0, peers = 49 47 48") + // Verify cache was hit. All localHits and peers are gone as the hotCache has + // the data we need + run("cached_base", 200, "localHits = 0, peers = 0 0 0") resetCacheSize(0) // With one of the peers being down. diff --git a/http.go b/http.go index d84d5ae..39e0703 100644 --- a/http.go +++ b/http.go @@ -38,16 +38,6 @@ const defaultReplicas = 50 // HTTPPool implements PeerPicker for a pool of HTTP peers. type HTTPPool struct { - // Context optionally specifies a context for the server to use when it - // receives a request. - // If nil, the server uses a nil Context. - Context func(*http.Request) context.Context - - // Transport optionally specifies an http.RoundTripper for the client - // to use when it makes a request. - // If nil, the client uses http.DefaultTransport. - Transport func(context.Context) http.RoundTripper - // this peer's base URL, e.g. "https://example.net:8000" self string @@ -72,6 +62,16 @@ type HTTPPoolOptions struct { // HashFn specifies the hash function of the consistent hash. // If blank, it defaults to crc32.ChecksumIEEE. HashFn consistenthash.Hash + + // Transport optionally specifies an http.RoundTripper for the client + // to use when it makes a request. + // If nil, the client uses http.DefaultTransport. + Transport func(context.Context) http.RoundTripper + + // Context optionally specifies a context for the server to use when it + // receives a request. + // If nil, uses the http.Request.Context() + Context func(*http.Request) context.Context } // NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker. @@ -124,7 +124,7 @@ func (p *HTTPPool) Set(peers ...string) { p.peers.Add(peers...) p.httpGetters = make(map[string]*httpGetter, len(peers)) for _, peer := range peers { - p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath} + p.httpGetters[peer] = &httpGetter{getTransport: p.opts.Transport, baseURL: peer + p.opts.BasePath} } } @@ -174,8 +174,10 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } var ctx context.Context - if p.Context != nil { - ctx = p.Context(r) + if p.opts.Context != nil { + ctx = p.opts.Context(r) + } else { + ctx = r.Context() } group.Stats.ServerRequests.Add(1) @@ -216,8 +218,9 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type httpGetter struct { - transport func(context.Context) http.RoundTripper - baseURL string + getTransport func(context.Context) http.RoundTripper + transport http.RoundTripper + baseURL string } var bufferPool = sync.Pool{ @@ -239,11 +242,18 @@ func (h *httpGetter) makeRequest(ctx context.Context, method string, in *pb.GetR // Pass along the context to the RoundTripper req = req.WithContext(ctx) - tr := http.DefaultTransport - if h.transport != nil { - tr = h.transport(ctx) + // Associate the transport with this peer so we take advantage of connection reuse. + if h.transport == nil { + if h.getTransport != nil { + h.transport = h.getTransport(ctx) + } + // Ensure we have a copy of the default transport and not just a reference. + tr := http.DefaultTransport.(*http.Transport) + trCopy := http.Transport(*tr) + h.transport = &trCopy } - res, err := tr.RoundTrip(req) + + res, err := h.transport.RoundTrip(req) if err != nil { return err }