diff --git a/.travis.yml b/.travis.yml index e4fa39d..1db9513 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,9 +9,9 @@ script: - go test ./... go: - - 1.9.x - 1.10.x - 1.11.x + - 1.12.x - master cache: diff --git a/CHANGELOG b/CHANGELOG new file mode 100644 index 0000000..175ceda --- /dev/null +++ b/CHANGELOG @@ -0,0 +1,30 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [2.0.0] - 2019-05-30 +### Changes +* Now using golang standard `context.Context` instead of `groupcache.Context`. +* HTTP requests made by `httpGetter` now respect `context.Context` done. +* Moved `HTTPPool` config `Context` and `Transport` to `HTTPPoolOptions` for consist configuration. +* Now Associating the transport with peer `httpGetter` so we take advantage of + connection reuse. This lowers the impact on DNS and improves performance for + high request volume low latency applications. +* Now always populating the hotcache. A more complex algorithm is unnecessary + when the LRU cache will ensure the most used values remain in the cache. The + evict code ensures the hotcache does not overcrowd the maincache. + +## [1.3.0] - 2019-05-23 +### Added +* Added `Remove()` method to `Group` to purge a key from the group. + +## [1.1.0] - 2019-04-10 +### Added +* Sinks can now accept an expire time +* Changed import path to mailgun/groupcache + +## [hash 5b532d6fd5efaf7fa130d4e859a2fde0fc3a9e1b] - 2019-01-29 +### Changes +* Initial import from https://github.com/golang/groupcache diff --git a/groupcache.go b/groupcache.go index e555eb3..a45661c 100644 --- a/groupcache.go +++ b/groupcache.go @@ -25,8 +25,8 @@ limitations under the License. package groupcache import ( + "context" "errors" - "math/rand" "strconv" "sync" "sync/atomic" @@ -45,13 +45,13 @@ type Getter interface { // uniquely describe the loaded data, without an implicit // current time, and without relying on cache expiration // mechanisms. - Get(ctx Context, key string, dest Sink) error + Get(ctx context.Context, key string, dest Sink) error } // A GetterFunc implements Getter with a function. -type GetterFunc func(ctx Context, key string, dest Sink) error +type GetterFunc func(ctx context.Context, key string, dest Sink) error -func (f GetterFunc) Get(ctx Context, key string, dest Sink) error { +func (f GetterFunc) Get(ctx context.Context, key string, dest Sink) error { return f(ctx, key, dest) } @@ -210,7 +210,7 @@ func (g *Group) initPeers() { } } -func (g *Group) Get(ctx Context, key string, dest Sink) error { +func (g *Group) Get(ctx context.Context, key string, dest Sink) error { g.peersOnce.Do(g.initPeers) g.Stats.Gets.Add(1) if dest == nil { @@ -240,7 +240,7 @@ func (g *Group) Get(ctx Context, key string, dest Sink) error { // Remove clears the key from our cache then forwards the remove // request to all peers. -func (g *Group) Remove(ctx Context, key string) error { +func (g *Group) Remove(ctx context.Context, key string) error { g.peersOnce.Do(g.initPeers) _, err := g.removeGroup.Do(key, func() (interface{}, error) { @@ -288,7 +288,7 @@ func (g *Group) Remove(ctx Context, key string) error { } // load loads key either by invoking the getter locally or by sending it to another machine. -func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { +func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { // Check the cache again because singleflight can only dedup calls @@ -347,7 +347,7 @@ func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPo return } -func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) { +func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) { err := g.getter.Get(ctx, key, dest) if err != nil { return ByteView{}, err @@ -355,7 +355,7 @@ func (g *Group) getLocally(ctx Context, key string, dest Sink) (ByteView, error) return dest.view() } -func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView, error) { +func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) { req := &pb.GetRequest{ Group: &g.name, Key: &key, @@ -375,16 +375,13 @@ func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView } value := ByteView{b: res.Value, e: expire} - // TODO(bradfitz): use res.MinuteQps or something smart to - // conditionally populate hotCache. For now just do it some - // percentage of the time. - if rand.Intn(10) == 0 { - g.populateCache(key, value, &g.hotCache) - } + + // Always populate the hot cache + g.populateCache(key, value, &g.hotCache) return value, nil } -func (g *Group) removeFromPeer(ctx Context, peer ProtoGetter, key string) error { +func (g *Group) removeFromPeer(ctx context.Context, peer ProtoGetter, key string) error { req := &pb.GetRequest{ Group: &g.name, Key: &key, diff --git a/groupcache_test.go b/groupcache_test.go index da24f28..77402d7 100644 --- a/groupcache_test.go +++ b/groupcache_test.go @@ -19,10 +19,10 @@ limitations under the License. package groupcache import ( + "context" "errors" "fmt" "hash/crc32" - "math/rand" "reflect" "sync" "testing" @@ -32,7 +32,7 @@ import ( "github.com/golang/protobuf/proto" pb "github.com/mailgun/groupcache/groupcachepb" - testpb "github.com/mailgun/groupcache/testpb" + "github.com/mailgun/groupcache/testpb" ) var ( @@ -41,7 +41,7 @@ var ( stringc = make(chan string) - dummyCtx Context + dummyCtx context.Context // cacheFills is the number of times stringGroup or // protoGroup's Getter have been called. Read using the @@ -59,7 +59,7 @@ const ( ) func testSetup() { - stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error { + stringGroup = NewGroup(stringGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error { if key == fromChan { key = <-stringc } @@ -67,7 +67,7 @@ func testSetup() { return dest.SetString("ECHO:"+key, time.Time{}) })) - protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error { + protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error { if key == fromChan { key = <-stringc } @@ -78,7 +78,7 @@ func testSetup() { }, time.Time{}) })) - expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error { + expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ context.Context, key string, dest Sink) error { cacheFills.Add(1) return dest.SetString("ECHO:"+key, time.Now().Add(time.Millisecond*100)) })) @@ -254,7 +254,7 @@ type fakePeer struct { fail bool } -func (p *fakePeer) Get(_ Context, in *pb.GetRequest, out *pb.GetResponse) error { +func (p *fakePeer) Get(_ context.Context, in *pb.GetRequest, out *pb.GetResponse) error { p.hits++ if p.fail { return errors.New("simulated error from peer") @@ -263,7 +263,7 @@ func (p *fakePeer) Get(_ Context, in *pb.GetRequest, out *pb.GetResponse) error return nil } -func (p *fakePeer) Remove(_ Context, in *pb.GetRequest) error { +func (p *fakePeer) Remove(_ context.Context, in *pb.GetRequest) error { p.hits++ if p.fail { return errors.New("simulated error from peer") @@ -288,14 +288,13 @@ func (p fakePeers) GetAll() []ProtoGetter { // tests that peers (virtual, in-process) are hit, and how much. func TestPeers(t *testing.T) { once.Do(testSetup) - rand.Seed(123) peer0 := &fakePeer{} peer1 := &fakePeer{} peer2 := &fakePeer{} peerList := fakePeers([]ProtoGetter{peer0, peer1, peer2, nil}) const cacheSize = 0 // disabled localHits := 0 - getter := func(_ Context, key string, dest Sink) error { + getter := func(_ context.Context, key string, dest Sink) error { localHits++ return dest.SetString("got:"+key, time.Time{}) } @@ -338,9 +337,9 @@ func TestPeers(t *testing.T) { resetCacheSize(1 << 20) run("base", 200, "localHits = 49, peers = 51 49 51") - // Verify cache was hit. All localHits are gone, and some of - // the peer hits (the ones randomly selected to be maybe hot) - run("cached_base", 200, "localHits = 0, peers = 49 47 48") + // Verify cache was hit. All localHits and peers are gone as the hotCache has + // the data we need + run("cached_base", 200, "localHits = 0, peers = 0 0 0") resetCacheSize(0) // With one of the peers being down. @@ -427,7 +426,7 @@ func (g *orderedFlightGroup) Lock(fn func()) { func TestNoDedup(t *testing.T) { const testkey = "testkey" const testval = "testval" - g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error { + g := newGroup("testgroup", 1024, GetterFunc(func(_ context.Context, key string, dest Sink) error { return dest.SetString(testval, time.Time{}) }), nil) diff --git a/http.go b/http.go index d45955e..39e0703 100644 --- a/http.go +++ b/http.go @@ -18,6 +18,7 @@ package groupcache import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -37,16 +38,6 @@ const defaultReplicas = 50 // HTTPPool implements PeerPicker for a pool of HTTP peers. type HTTPPool struct { - // Context optionally specifies a context for the server to use when it - // receives a request. - // If nil, the server uses a nil Context. - Context func(*http.Request) Context - - // Transport optionally specifies an http.RoundTripper for the client - // to use when it makes a request. - // If nil, the client uses http.DefaultTransport. - Transport func(Context) http.RoundTripper - // this peer's base URL, e.g. "https://example.net:8000" self string @@ -71,6 +62,16 @@ type HTTPPoolOptions struct { // HashFn specifies the hash function of the consistent hash. // If blank, it defaults to crc32.ChecksumIEEE. HashFn consistenthash.Hash + + // Transport optionally specifies an http.RoundTripper for the client + // to use when it makes a request. + // If nil, the client uses http.DefaultTransport. + Transport func(context.Context) http.RoundTripper + + // Context optionally specifies a context for the server to use when it + // receives a request. + // If nil, uses the http.Request.Context() + Context func(*http.Request) context.Context } // NewHTTPPool initializes an HTTP pool of peers, and registers itself as a PeerPicker. @@ -123,7 +124,7 @@ func (p *HTTPPool) Set(peers ...string) { p.peers.Add(peers...) p.httpGetters = make(map[string]*httpGetter, len(peers)) for _, peer := range peers { - p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath} + p.httpGetters[peer] = &httpGetter{getTransport: p.opts.Transport, baseURL: peer + p.opts.BasePath} } } @@ -172,9 +173,11 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "no such group: "+groupName, http.StatusNotFound) return } - var ctx Context - if p.Context != nil { - ctx = p.Context(r) + var ctx context.Context + if p.opts.Context != nil { + ctx = p.opts.Context(r) + } else { + ctx = r.Context() } group.Stats.ServerRequests.Add(1) @@ -215,15 +218,16 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type httpGetter struct { - transport func(Context) http.RoundTripper - baseURL string + getTransport func(context.Context) http.RoundTripper + transport http.RoundTripper + baseURL string } var bufferPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } -func (h *httpGetter) makeRequest(context Context, method string, in *pb.GetRequest, out *http.Response) error { +func (h *httpGetter) makeRequest(ctx context.Context, method string, in *pb.GetRequest, out *http.Response) error { u := fmt.Sprintf( "%v%v/%v", h.baseURL, @@ -234,11 +238,22 @@ func (h *httpGetter) makeRequest(context Context, method string, in *pb.GetReque if err != nil { return err } - tr := http.DefaultTransport - if h.transport != nil { - tr = h.transport(context) + + // Pass along the context to the RoundTripper + req = req.WithContext(ctx) + + // Associate the transport with this peer so we take advantage of connection reuse. + if h.transport == nil { + if h.getTransport != nil { + h.transport = h.getTransport(ctx) + } + // Ensure we have a copy of the default transport and not just a reference. + tr := http.DefaultTransport.(*http.Transport) + trCopy := http.Transport(*tr) + h.transport = &trCopy } - res, err := tr.RoundTrip(req) + + res, err := h.transport.RoundTrip(req) if err != nil { return err } @@ -246,7 +261,7 @@ func (h *httpGetter) makeRequest(context Context, method string, in *pb.GetReque return nil } -func (h *httpGetter) Get(ctx Context, in *pb.GetRequest, out *pb.GetResponse) error { +func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error { var res http.Response if err := h.makeRequest(ctx, http.MethodGet, in, &res); err != nil { return err @@ -269,7 +284,7 @@ func (h *httpGetter) Get(ctx Context, in *pb.GetRequest, out *pb.GetResponse) er return nil } -func (h *httpGetter) Remove(ctx Context, in *pb.GetRequest) error { +func (h *httpGetter) Remove(ctx context.Context, in *pb.GetRequest) error { var res http.Response if err := h.makeRequest(ctx, http.MethodDelete, in, &res); err != nil { return err diff --git a/http_test.go b/http_test.go index 0453999..d3b40d4 100644 --- a/http_test.go +++ b/http_test.go @@ -17,6 +17,7 @@ limitations under the License. package groupcache import ( + "context" "errors" "flag" "fmt" @@ -96,14 +97,18 @@ func TestHTTPPool(t *testing.T) { // Dummy getter function. Gets should go to children only. // The only time this process will handle a get is when the // children can't be contacted for some reason. - getter := GetterFunc(func(ctx Context, key string, dest Sink) error { + getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { return errors.New("parent getter called; something's wrong") }) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + g := NewGroup("httpPoolTest", 1<<20, getter) for _, key := range testKeys(nGets) { var value string - if err := g.Get(nil, key, StringSink(&value)); err != nil { + if err := g.Get(ctx, key, StringSink(&value)); err != nil { t.Fatal(err) } if suffix := ":" + key; !strings.HasSuffix(value, suffix) { @@ -122,7 +127,7 @@ func TestHTTPPool(t *testing.T) { // Multiple gets on the same key for i := 0; i < 2; i++ { - if err := g.Get(nil, key, StringSink(&value)); err != nil { + if err := g.Get(ctx, key, StringSink(&value)); err != nil { t.Fatal(err) } } @@ -133,12 +138,12 @@ func TestHTTPPool(t *testing.T) { } // Remove the key from the cache and we should see another server hit - if err := g.Remove(nil, key); err != nil { + if err := g.Remove(ctx, key); err != nil { t.Fatal(err) } // Get the key again - if err := g.Get(nil, key, StringSink(&value)); err != nil { + if err := g.Get(ctx, key, StringSink(&value)); err != nil { t.Fatal(err) } @@ -162,7 +167,7 @@ func beChildForTestHTTPPool(t *testing.T) { p := NewHTTPPool("http://" + addrs[*peerIndex]) p.Set(addrToURL(addrs)...) - getter := GetterFunc(func(ctx Context, key string, dest Sink) error { + getter := GetterFunc(func(ctx context.Context, key string, dest Sink) error { if _, err := http.Get(*serverAddr); err != nil { t.Logf("HTTP request from getter failed with '%s'", err) } diff --git a/peers.go b/peers.go index 4bf9e62..05847da 100644 --- a/peers.go +++ b/peers.go @@ -19,18 +19,14 @@ limitations under the License. package groupcache import ( + "context" pb "github.com/mailgun/groupcache/groupcachepb" ) -// Context is an opaque value passed through calls to the -// ProtoGetter. It may be nil if your ProtoGetter implementation does -// not require a context. -type Context interface{} - // ProtoGetter is the interface that must be implemented by a peer. type ProtoGetter interface { - Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error - Remove(context Context, in *pb.GetRequest) error + Get(context context.Context, in *pb.GetRequest, out *pb.GetResponse) error + Remove(context context.Context, in *pb.GetRequest) error } // PeerPicker is the interface that must be implemented to locate