From 48a0ce2463c24e6bdb7e930f458a62876acaedfc Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 10 Jun 2019 13:46:25 -0500 Subject: [PATCH] Get() now returns immediately when context is done * `Get()` now returns immediately when context is done during a groupcache peer conversation. Previously `Get()` would call the `Getter` with a done context. --- CHANGELOG | 5 ++ README.md | 120 +++++++++++++++++++++++++++++++------ example_pb_test.go | 88 +++++++++++++++++++++++++++ example_test.go | 87 +++++++++++++++++++++++++++ groupcache.go | 5 ++ groupcache_test.go | 34 ++++++++++- groupcachepb/example.proto | 12 ++++ proto.sh | 4 ++ 8 files changed, 334 insertions(+), 21 deletions(-) create mode 100644 example_pb_test.go create mode 100644 example_test.go create mode 100644 groupcachepb/example.proto diff --git a/CHANGELOG b/CHANGELOG index ae90f94..409c043 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.1.0] - 2019-06-10 +### Changes +* `Get()` now returns immediately when context is done during a groupcache peer + conversation. Previously `Get()` would call the `Getter` with a done context. + ## [2.0.0] - 2019-06-04 ### Changes * Now using golang standard `context.Context` instead of `groupcache.Context`. diff --git a/README.md b/README.md index 30e8fbc..a1c093f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,10 @@ # groupcache +A modified version of [group cache](https://github.com/golang/groupcache) with +support for `context.Context`, [go modules](https://github.com/golang/go/wiki/Modules), +and explicit key removal and expiration. See the `CHANGELOG` for a complete list of +modifications. + ## Summary groupcache is a caching and cache-filling library, intended as a @@ -7,7 +12,33 @@ replacement for memcached in many cases. For API docs and examples, see http://godoc.org/github.com/mailgun/groupcache -## Comparison to memcached + +### Modifications from original library + +* Support for explicit key removal from a group. `Remove()` requests are + first sent to the peer who owns the key, then the remove request is + forwarded to every peer in the groupcache. NOTE: This is a best case design + since it is possible a temporary network disruption could occur resulting + in remove requests never making it their peers. In practice this scenario + is very rare and the system remains very consistent. In case of an + inconsistency placing a expiration time on your values will ensure the + cluster eventually becomes consistent again. + +* Support for expired values. `SetBytes()`, `SetProto()` and `SetString()` now + accept an optional `time.Time{}` which represents a time in the future when the + value will expire. Expiration is handled by the LRU Cache when a `Get()` on a + key is requested. This means no network coordination of expired values is needed. + However this does require that time on all nodes in the cluster is synchronized + for consistent expiration of values. + +* Network methods now accept golang standard `context.Context` instead of + `groupcache.Context`. + +* Now always populating the hotcache. A more complex algorithm is unnecessary + when the LRU cache will ensure the most used values remain in the cache. The + evict code ensures the hotcache never overcrowds the maincache. + +## Comparing Groupcache to memcached ### **Like memcached**, groupcache: @@ -28,16 +59,7 @@ For API docs and examples, see http://godoc.org/github.com/mailgun/groupcache the loaded value to all callers. * does not support versioned values. If key "foo" is value "bar", - key "foo" must always be "bar". There are neither cache expiration - times, nor explicit cache evictions. Thus there is also no CAS, - nor Increment/Decrement. This also means that groupcache.... - - * ... supports automatic mirroring of super-hot items to multiple - processes. This prevents memcached hot spotting where a machine's - CPU and/or NIC are overloaded by very popular keys/values. - - * is currently only available for Go. It's very unlikely that I - (bradfitz@) will port the code to any other language. + key "foo" must always be "bar". ## Loading process @@ -58,16 +80,76 @@ In a nutshell, a groupcache lookup of **Get("foo")** looks like: the answer. If the RPC fails, just load it locally (still with local dup suppression). -## Users +## Example -groupcache is in production use by dl.google.com (its original user), -parts of Blogger, parts of Google Code, parts of Google Fiber, parts -of Google production monitoring systems, etc. +```go +import ( + "context" + "fmt" + "log" + "time" -## Presentations + "github.com/mailgun/groupcache/v2" +) -See http://talks.golang.org/2013/oscon-dl.slide +func ExampleUsage() { + // Keep track of peers in our cluster and add our instance to the pool `http://localhost:8080` + pool := groupcache.NewHTTPPoolOpts("http://localhost:8080", &groupcache.HTTPPoolOptions{}) -## Help + // Add more peers to the cluster + //pool.Set("http://peer1:8080", "http://peer2:8080") + + server := http.Server{ + Addr: "localhost:8080", + Handler: pool, + } + + // Start a HTTP server to listen for peer requests from the groupcache + go func() { + log.Printf("Serving....\n") + if err := server.ListenAndServe(); err != nil { + log.Fatal(err) + } + }() + defer server.Shutdown(context.Background()) + + // Create a new group cache with a max cache size of 3MB + group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc( + func(ctx context.Context, id string, dest groupcache.Sink) error { + + // Returns a protobuf struct `User` + if user, err := fetchUserFromMongo(ctx, id); err != nil { + return err + } + + // Set the user in the groupcache to expire after 5 minutes + if err := dest.SetProto(&user, time.Now().Add(time.Minute*5)); err != nil { + return err + } + return nil + }, + )) + + var user User + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + + if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil { + log.Fatal(err) + } + + fmt.Printf("-- User --\n") + fmt.Printf("Id: %s\n", user.Id) + fmt.Printf("Name: %s\n", user.Name) + fmt.Printf("Age: %d\n", user.Age) + fmt.Printf("IsSuper: %t\n", user.IsSuper) + + // Remove the key from the groupcache + if err := group.Remove(ctx, "12345"); err != nil { + log.Fatal(err) + } +} + +``` -Use the golang-nuts mailing list for any discussion or questions. diff --git a/example_pb_test.go b/example_pb_test.go new file mode 100644 index 0000000..621b82f --- /dev/null +++ b/example_pb_test.go @@ -0,0 +1,88 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: example.proto + +/* +Package groupcache_test is a generated protocol buffer package. + +It is generated from these files: + example.proto + +It has these top-level messages: + User +*/ +package groupcache_test + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type User struct { + Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"` + Age int64 `protobuf:"varint,3,opt,name=age" json:"age,omitempty"` + IsSuper bool `protobuf:"varint,4,opt,name=is_super,json=isSuper" json:"is_super,omitempty"` +} + +func (m *User) Reset() { *m = User{} } +func (m *User) String() string { return proto.CompactTextString(m) } +func (*User) ProtoMessage() {} +func (*User) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *User) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *User) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *User) GetAge() int64 { + if m != nil { + return m.Age + } + return 0 +} + +func (m *User) GetIsSuper() bool { + if m != nil { + return m.IsSuper + } + return false +} + +func init() { + proto.RegisterType((*User)(nil), "groupcachepb.User") +} + +func init() { proto.RegisterFile("example.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 148 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xad, 0x48, 0xcc, + 0x2d, 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0xca, 0x2f, 0x2d, + 0x48, 0x4e, 0x4c, 0xce, 0x48, 0x2d, 0x48, 0x52, 0x0a, 0xe7, 0x62, 0x09, 0x2d, 0x4e, 0x2d, 0x12, + 0xe2, 0xe3, 0x62, 0xca, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x62, 0xca, 0x4c, 0x11, + 0x12, 0xe2, 0x62, 0xc9, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x02, 0x8b, 0x80, 0xd9, 0x42, 0x02, 0x5c, + 0xcc, 0x89, 0xe9, 0xa9, 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x20, 0xa6, 0x90, 0x24, 0x17, + 0x47, 0x66, 0x71, 0x7c, 0x71, 0x69, 0x41, 0x6a, 0x91, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10, + 0x7b, 0x66, 0x71, 0x30, 0x88, 0xeb, 0x24, 0x18, 0xc5, 0x8f, 0xb0, 0x28, 0xbe, 0x24, 0xb5, 0xb8, + 0x24, 0x89, 0x0d, 0xec, 0x00, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x26, 0x2e, 0x5f, 0x1a, + 0x91, 0x00, 0x00, 0x00, +} diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..9fd306e --- /dev/null +++ b/example_test.go @@ -0,0 +1,87 @@ +package groupcache_test + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/mailgun/groupcache/v2" +) + +func ExampleUsage() { + /* + // Keep track of peers in our cluster and add our instance to the pool `http://localhost:8080` + pool := groupcache.NewHTTPPoolOpts("http://localhost:8080", &groupcache.HTTPPoolOptions{}) + + // Add more peers to the cluster + //pool.Set("http://peer1:8080", "http://peer2:8080") + + server := http.Server{ + Addr: "localhost:8080", + Handler: pool, + } + + // Start a HTTP server to listen for peer requests from the groupcache + go func() { + log.Printf("Serving....\n") + if err := server.ListenAndServe(); err != nil { + log.Fatal(err) + } + }() + defer server.Shutdown(context.Background()) + */ + + // Create a new group cache with a max cache size of 3MB + group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc( + func(ctx context.Context, id string, dest groupcache.Sink) error { + + // In a real scenario we might fetch the value from a database. + /*if user, err := fetchUserFromMongo(ctx, id); err != nil { + return err + }*/ + + user := User{ + Id: "12345", + Name: "John Doe", + Age: 40, + IsSuper: true, + } + + // Set the user in the groupcache to expire after 5 minutes + if err := dest.SetProto(&user, time.Now().Add(time.Minute*5)); err != nil { + return err + } + return nil + }, + )) + + var user User + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil { + log.Fatal(err) + } + + fmt.Printf("-- User --\n") + fmt.Printf("Id: %s\n", user.Id) + fmt.Printf("Name: %s\n", user.Name) + fmt.Printf("Age: %d\n", user.Age) + fmt.Printf("IsSuper: %t\n", user.IsSuper) + + /* + // Remove the key from the groupcache + if err := group.Remove(ctx, "12345"); err != nil { + fmt.Printf("Remove Err: %s\n", err) + log.Fatal(err) + } + */ + + // Output: -- User -- + // Id: 12345 + // Name: John Doe + // Age: 40 + // IsSuper: true +} diff --git a/groupcache.go b/groupcache.go index 833b7f1..480e504 100644 --- a/groupcache.go +++ b/groupcache.go @@ -326,6 +326,11 @@ func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView return value, nil } g.Stats.PeerErrors.Add(1) + if ctx != nil && ctx.Err() != nil { + // Return here without attempting to get locally + // since the context is no longer valid + return nil, err + } // TODO(bradfitz): log the peer's error? keep // log of the past few for /groupcachez? It's // probably boring (normal task movement), so not diff --git a/groupcache_test.go b/groupcache_test.go index e4ad028..a3d8369 100644 --- a/groupcache_test.go +++ b/groupcache_test.go @@ -491,5 +491,35 @@ func TestGroupStatsAlignment(t *testing.T) { } } -// TODO(bradfitz): port the Google-internal full integration test into here, -// using HTTP requests instead of our RPC system. +type slowPeer struct { + fakePeer +} + +func (p *slowPeer) Get(_ context.Context, in *pb.GetRequest, out *pb.GetResponse) error { + time.Sleep(time.Second) + out.Value = []byte("got:" + in.GetKey()) + return nil +} + +func TestContextDeadlineOnPeer(t *testing.T) { + once.Do(testSetup) + peer0 := &slowPeer{} + peer1 := &slowPeer{} + peer2 := &slowPeer{} + peerList := fakePeers([]ProtoGetter{peer0, peer1, peer2, nil}) + getter := func(_ context.Context, key string, dest Sink) error { + return dest.SetString("got:"+key, time.Time{}) + } + testGroup := newGroup("TestContextDeadlineOnPeer-group", cacheSize, GetterFunc(getter), peerList) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300) + defer cancel() + + var got string + err := testGroup.Get(ctx, "test-key", StringSink(&got)) + if err != nil { + if err != context.DeadlineExceeded { + t.Errorf("expected Get to return context deadline exceeded") + } + } +} diff --git a/groupcachepb/example.proto b/groupcachepb/example.proto new file mode 100644 index 0000000..92dd7d0 --- /dev/null +++ b/groupcachepb/example.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +option go_package = "groupcache_test"; + +package groupcachepb; + +message User { + string id = 1; + string name = 2; + int64 age = 3; + bool is_super = 4; +} \ No newline at end of file diff --git a/proto.sh b/proto.sh index ebdba0a..e710926 100755 --- a/proto.sh +++ b/proto.sh @@ -10,3 +10,7 @@ PROTO_DIR=groupcachepb protoc -I=$PROTO_DIR \ --go_out=$PROTO_DIR \ $PROTO_DIR/groupcache.proto + +protoc -I=$PROTO_DIR \ + --go_out=. \ + $PROTO_DIR/example.proto