mirror of
https://github.com/mailgun/groupcache.git
synced 2024-11-16 14:10:04 +00:00
Get() now returns immediately when context is done
* `Get()` now returns immediately when context is done during a groupcache peer conversation. Previously `Get()` would call the `Getter` with a done context.
This commit is contained in:
parent
b9bcb40345
commit
48a0ce2463
@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
|
|||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## [2.1.0] - 2019-06-10
|
||||||
|
### Changes
|
||||||
|
* `Get()` now returns immediately when context is done during a groupcache peer
|
||||||
|
conversation. Previously `Get()` would call the `Getter` with a done context.
|
||||||
|
|
||||||
## [2.0.0] - 2019-06-04
|
## [2.0.0] - 2019-06-04
|
||||||
### Changes
|
### Changes
|
||||||
* Now using golang standard `context.Context` instead of `groupcache.Context`.
|
* Now using golang standard `context.Context` instead of `groupcache.Context`.
|
||||||
|
120
README.md
120
README.md
@ -1,5 +1,10 @@
|
|||||||
# groupcache
|
# groupcache
|
||||||
|
|
||||||
|
A modified version of [group cache](https://github.com/golang/groupcache) with
|
||||||
|
support for `context.Context`, [go modules](https://github.com/golang/go/wiki/Modules),
|
||||||
|
and explicit key removal and expiration. See the `CHANGELOG` for a complete list of
|
||||||
|
modifications.
|
||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
groupcache is a caching and cache-filling library, intended as a
|
groupcache is a caching and cache-filling library, intended as a
|
||||||
@ -7,7 +12,33 @@ replacement for memcached in many cases.
|
|||||||
|
|
||||||
For API docs and examples, see http://godoc.org/github.com/mailgun/groupcache
|
For API docs and examples, see http://godoc.org/github.com/mailgun/groupcache
|
||||||
|
|
||||||
## Comparison to memcached
|
|
||||||
|
### Modifications from original library
|
||||||
|
|
||||||
|
* Support for explicit key removal from a group. `Remove()` requests are
|
||||||
|
first sent to the peer who owns the key, then the remove request is
|
||||||
|
forwarded to every peer in the groupcache. NOTE: This is a best case design
|
||||||
|
since it is possible a temporary network disruption could occur resulting
|
||||||
|
in remove requests never making it their peers. In practice this scenario
|
||||||
|
is very rare and the system remains very consistent. In case of an
|
||||||
|
inconsistency placing a expiration time on your values will ensure the
|
||||||
|
cluster eventually becomes consistent again.
|
||||||
|
|
||||||
|
* Support for expired values. `SetBytes()`, `SetProto()` and `SetString()` now
|
||||||
|
accept an optional `time.Time{}` which represents a time in the future when the
|
||||||
|
value will expire. Expiration is handled by the LRU Cache when a `Get()` on a
|
||||||
|
key is requested. This means no network coordination of expired values is needed.
|
||||||
|
However this does require that time on all nodes in the cluster is synchronized
|
||||||
|
for consistent expiration of values.
|
||||||
|
|
||||||
|
* Network methods now accept golang standard `context.Context` instead of
|
||||||
|
`groupcache.Context`.
|
||||||
|
|
||||||
|
* Now always populating the hotcache. A more complex algorithm is unnecessary
|
||||||
|
when the LRU cache will ensure the most used values remain in the cache. The
|
||||||
|
evict code ensures the hotcache never overcrowds the maincache.
|
||||||
|
|
||||||
|
## Comparing Groupcache to memcached
|
||||||
|
|
||||||
### **Like memcached**, groupcache:
|
### **Like memcached**, groupcache:
|
||||||
|
|
||||||
@ -28,16 +59,7 @@ For API docs and examples, see http://godoc.org/github.com/mailgun/groupcache
|
|||||||
the loaded value to all callers.
|
the loaded value to all callers.
|
||||||
|
|
||||||
* does not support versioned values. If key "foo" is value "bar",
|
* does not support versioned values. If key "foo" is value "bar",
|
||||||
key "foo" must always be "bar". There are neither cache expiration
|
key "foo" must always be "bar".
|
||||||
times, nor explicit cache evictions. Thus there is also no CAS,
|
|
||||||
nor Increment/Decrement. This also means that groupcache....
|
|
||||||
|
|
||||||
* ... supports automatic mirroring of super-hot items to multiple
|
|
||||||
processes. This prevents memcached hot spotting where a machine's
|
|
||||||
CPU and/or NIC are overloaded by very popular keys/values.
|
|
||||||
|
|
||||||
* is currently only available for Go. It's very unlikely that I
|
|
||||||
(bradfitz@) will port the code to any other language.
|
|
||||||
|
|
||||||
## Loading process
|
## Loading process
|
||||||
|
|
||||||
@ -58,16 +80,76 @@ In a nutshell, a groupcache lookup of **Get("foo")** looks like:
|
|||||||
the answer. If the RPC fails, just load it locally (still with
|
the answer. If the RPC fails, just load it locally (still with
|
||||||
local dup suppression).
|
local dup suppression).
|
||||||
|
|
||||||
## Users
|
## Example
|
||||||
|
|
||||||
groupcache is in production use by dl.google.com (its original user),
|
```go
|
||||||
parts of Blogger, parts of Google Code, parts of Google Fiber, parts
|
import (
|
||||||
of Google production monitoring systems, etc.
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
## Presentations
|
"github.com/mailgun/groupcache/v2"
|
||||||
|
)
|
||||||
|
|
||||||
See http://talks.golang.org/2013/oscon-dl.slide
|
func ExampleUsage() {
|
||||||
|
// Keep track of peers in our cluster and add our instance to the pool `http://localhost:8080`
|
||||||
|
pool := groupcache.NewHTTPPoolOpts("http://localhost:8080", &groupcache.HTTPPoolOptions{})
|
||||||
|
|
||||||
## Help
|
// Add more peers to the cluster
|
||||||
|
//pool.Set("http://peer1:8080", "http://peer2:8080")
|
||||||
|
|
||||||
|
server := http.Server{
|
||||||
|
Addr: "localhost:8080",
|
||||||
|
Handler: pool,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a HTTP server to listen for peer requests from the groupcache
|
||||||
|
go func() {
|
||||||
|
log.Printf("Serving....\n")
|
||||||
|
if err := server.ListenAndServe(); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
defer server.Shutdown(context.Background())
|
||||||
|
|
||||||
|
// Create a new group cache with a max cache size of 3MB
|
||||||
|
group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
|
||||||
|
func(ctx context.Context, id string, dest groupcache.Sink) error {
|
||||||
|
|
||||||
|
// Returns a protobuf struct `User`
|
||||||
|
if user, err := fetchUserFromMongo(ctx, id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the user in the groupcache to expire after 5 minutes
|
||||||
|
if err := dest.SetProto(&user, time.Now().Add(time.Minute*5)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
var user User
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("-- User --\n")
|
||||||
|
fmt.Printf("Id: %s\n", user.Id)
|
||||||
|
fmt.Printf("Name: %s\n", user.Name)
|
||||||
|
fmt.Printf("Age: %d\n", user.Age)
|
||||||
|
fmt.Printf("IsSuper: %t\n", user.IsSuper)
|
||||||
|
|
||||||
|
// Remove the key from the groupcache
|
||||||
|
if err := group.Remove(ctx, "12345"); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
Use the golang-nuts mailing list for any discussion or questions.
|
|
||||||
|
88
example_pb_test.go
Normal file
88
example_pb_test.go
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||||
|
// source: example.proto
|
||||||
|
|
||||||
|
/*
|
||||||
|
Package groupcache_test is a generated protocol buffer package.
|
||||||
|
|
||||||
|
It is generated from these files:
|
||||||
|
example.proto
|
||||||
|
|
||||||
|
It has these top-level messages:
|
||||||
|
User
|
||||||
|
*/
|
||||||
|
package groupcache_test
|
||||||
|
|
||||||
|
import proto "github.com/golang/protobuf/proto"
|
||||||
|
import fmt "fmt"
|
||||||
|
import math "math"
|
||||||
|
|
||||||
|
// Reference imports to suppress errors if they are not otherwise used.
|
||||||
|
var _ = proto.Marshal
|
||||||
|
var _ = fmt.Errorf
|
||||||
|
var _ = math.Inf
|
||||||
|
|
||||||
|
// This is a compile-time assertion to ensure that this generated file
|
||||||
|
// is compatible with the proto package it is being compiled against.
|
||||||
|
// A compilation error at this line likely means your copy of the
|
||||||
|
// proto package needs to be updated.
|
||||||
|
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||||
|
|
||||||
|
type User struct {
|
||||||
|
Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
|
||||||
|
Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
|
||||||
|
Age int64 `protobuf:"varint,3,opt,name=age" json:"age,omitempty"`
|
||||||
|
IsSuper bool `protobuf:"varint,4,opt,name=is_super,json=isSuper" json:"is_super,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *User) Reset() { *m = User{} }
|
||||||
|
func (m *User) String() string { return proto.CompactTextString(m) }
|
||||||
|
func (*User) ProtoMessage() {}
|
||||||
|
func (*User) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
|
||||||
|
|
||||||
|
func (m *User) GetId() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Id
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *User) GetName() string {
|
||||||
|
if m != nil {
|
||||||
|
return m.Name
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *User) GetAge() int64 {
|
||||||
|
if m != nil {
|
||||||
|
return m.Age
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *User) GetIsSuper() bool {
|
||||||
|
if m != nil {
|
||||||
|
return m.IsSuper
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
proto.RegisterType((*User)(nil), "groupcachepb.User")
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() { proto.RegisterFile("example.proto", fileDescriptor0) }
|
||||||
|
|
||||||
|
var fileDescriptor0 = []byte{
|
||||||
|
// 148 bytes of a gzipped FileDescriptorProto
|
||||||
|
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xad, 0x48, 0xcc,
|
||||||
|
0x2d, 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0xca, 0x2f, 0x2d,
|
||||||
|
0x48, 0x4e, 0x4c, 0xce, 0x48, 0x2d, 0x48, 0x52, 0x0a, 0xe7, 0x62, 0x09, 0x2d, 0x4e, 0x2d, 0x12,
|
||||||
|
0xe2, 0xe3, 0x62, 0xca, 0x4c, 0x91, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x62, 0xca, 0x4c, 0x11,
|
||||||
|
0x12, 0xe2, 0x62, 0xc9, 0x4b, 0xcc, 0x4d, 0x95, 0x60, 0x02, 0x8b, 0x80, 0xd9, 0x42, 0x02, 0x5c,
|
||||||
|
0xcc, 0x89, 0xe9, 0xa9, 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x20, 0xa6, 0x90, 0x24, 0x17,
|
||||||
|
0x47, 0x66, 0x71, 0x7c, 0x71, 0x69, 0x41, 0x6a, 0x91, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10,
|
||||||
|
0x7b, 0x66, 0x71, 0x30, 0x88, 0xeb, 0x24, 0x18, 0xc5, 0x8f, 0xb0, 0x28, 0xbe, 0x24, 0xb5, 0xb8,
|
||||||
|
0x24, 0x89, 0x0d, 0xec, 0x00, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x26, 0x2e, 0x5f, 0x1a,
|
||||||
|
0x91, 0x00, 0x00, 0x00,
|
||||||
|
}
|
87
example_test.go
Normal file
87
example_test.go
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
package groupcache_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/mailgun/groupcache/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ExampleUsage() {
|
||||||
|
/*
|
||||||
|
// Keep track of peers in our cluster and add our instance to the pool `http://localhost:8080`
|
||||||
|
pool := groupcache.NewHTTPPoolOpts("http://localhost:8080", &groupcache.HTTPPoolOptions{})
|
||||||
|
|
||||||
|
// Add more peers to the cluster
|
||||||
|
//pool.Set("http://peer1:8080", "http://peer2:8080")
|
||||||
|
|
||||||
|
server := http.Server{
|
||||||
|
Addr: "localhost:8080",
|
||||||
|
Handler: pool,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a HTTP server to listen for peer requests from the groupcache
|
||||||
|
go func() {
|
||||||
|
log.Printf("Serving....\n")
|
||||||
|
if err := server.ListenAndServe(); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
defer server.Shutdown(context.Background())
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Create a new group cache with a max cache size of 3MB
|
||||||
|
group := groupcache.NewGroup("users", 3000000, groupcache.GetterFunc(
|
||||||
|
func(ctx context.Context, id string, dest groupcache.Sink) error {
|
||||||
|
|
||||||
|
// In a real scenario we might fetch the value from a database.
|
||||||
|
/*if user, err := fetchUserFromMongo(ctx, id); err != nil {
|
||||||
|
return err
|
||||||
|
}*/
|
||||||
|
|
||||||
|
user := User{
|
||||||
|
Id: "12345",
|
||||||
|
Name: "John Doe",
|
||||||
|
Age: 40,
|
||||||
|
IsSuper: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the user in the groupcache to expire after 5 minutes
|
||||||
|
if err := dest.SetProto(&user, time.Now().Add(time.Minute*5)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
var user User
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("-- User --\n")
|
||||||
|
fmt.Printf("Id: %s\n", user.Id)
|
||||||
|
fmt.Printf("Name: %s\n", user.Name)
|
||||||
|
fmt.Printf("Age: %d\n", user.Age)
|
||||||
|
fmt.Printf("IsSuper: %t\n", user.IsSuper)
|
||||||
|
|
||||||
|
/*
|
||||||
|
// Remove the key from the groupcache
|
||||||
|
if err := group.Remove(ctx, "12345"); err != nil {
|
||||||
|
fmt.Printf("Remove Err: %s\n", err)
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Output: -- User --
|
||||||
|
// Id: 12345
|
||||||
|
// Name: John Doe
|
||||||
|
// Age: 40
|
||||||
|
// IsSuper: true
|
||||||
|
}
|
@ -326,6 +326,11 @@ func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView
|
|||||||
return value, nil
|
return value, nil
|
||||||
}
|
}
|
||||||
g.Stats.PeerErrors.Add(1)
|
g.Stats.PeerErrors.Add(1)
|
||||||
|
if ctx != nil && ctx.Err() != nil {
|
||||||
|
// Return here without attempting to get locally
|
||||||
|
// since the context is no longer valid
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
// TODO(bradfitz): log the peer's error? keep
|
// TODO(bradfitz): log the peer's error? keep
|
||||||
// log of the past few for /groupcachez? It's
|
// log of the past few for /groupcachez? It's
|
||||||
// probably boring (normal task movement), so not
|
// probably boring (normal task movement), so not
|
||||||
|
@ -491,5 +491,35 @@ func TestGroupStatsAlignment(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(bradfitz): port the Google-internal full integration test into here,
|
type slowPeer struct {
|
||||||
// using HTTP requests instead of our RPC system.
|
fakePeer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *slowPeer) Get(_ context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
out.Value = []byte("got:" + in.GetKey())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestContextDeadlineOnPeer(t *testing.T) {
|
||||||
|
once.Do(testSetup)
|
||||||
|
peer0 := &slowPeer{}
|
||||||
|
peer1 := &slowPeer{}
|
||||||
|
peer2 := &slowPeer{}
|
||||||
|
peerList := fakePeers([]ProtoGetter{peer0, peer1, peer2, nil})
|
||||||
|
getter := func(_ context.Context, key string, dest Sink) error {
|
||||||
|
return dest.SetString("got:"+key, time.Time{})
|
||||||
|
}
|
||||||
|
testGroup := newGroup("TestContextDeadlineOnPeer-group", cacheSize, GetterFunc(getter), peerList)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var got string
|
||||||
|
err := testGroup.Get(ctx, "test-key", StringSink(&got))
|
||||||
|
if err != nil {
|
||||||
|
if err != context.DeadlineExceeded {
|
||||||
|
t.Errorf("expected Get to return context deadline exceeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
12
groupcachepb/example.proto
Normal file
12
groupcachepb/example.proto
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
option go_package = "groupcache_test";
|
||||||
|
|
||||||
|
package groupcachepb;
|
||||||
|
|
||||||
|
message User {
|
||||||
|
string id = 1;
|
||||||
|
string name = 2;
|
||||||
|
int64 age = 3;
|
||||||
|
bool is_super = 4;
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user