Sinks can not accept an expire time

This commit is contained in:
Derrick J. Wippler 2019-04-09 17:36:03 -05:00
parent 5b532d6fd5
commit ce8fe99325
10 changed files with 219 additions and 65 deletions

View File

@ -21,6 +21,7 @@ import (
"errors"
"io"
"strings"
"time"
)
// A ByteView holds an immutable view of bytes.
@ -33,6 +34,12 @@ type ByteView struct {
// If b is non-nil, b is used, else s is used.
b []byte
s string
e time.Time
}
// Returns the expire time associated with this view
func (v ByteView) Expire() time.Time {
return v.e
}
// Len returns the view's length.

View File

@ -30,6 +30,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"
pb "github.com/golang/groupcache/groupcachepb"
"github.com/golang/groupcache/lru"
@ -310,7 +311,16 @@ func (g *Group) getFromPeer(ctx Context, peer ProtoGetter, key string) (ByteView
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
var expire time.Time
if res.Expire != nil && *res.Expire != 0 {
expire = time.Unix(*res.Expire/int64(time.Second), *res.Expire%int64(time.Second))
if time.Now().After(expire) {
return ByteView{}, errors.New("peer returned expired value")
}
}
value := ByteView{b: res.Value, e: expire}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
@ -418,7 +428,7 @@ func (c *cache) add(key string, value ByteView) {
},
}
}
c.lru.Add(key, value)
c.lru.Add(key, value, value.Expire())
c.nbytes += int64(len(key)) + int64(value.Len())
}

View File

@ -36,8 +36,8 @@ import (
)
var (
once sync.Once
stringGroup, protoGroup Getter
once sync.Once
stringGroup, protoGroup, expireGroup Getter
stringc = make(chan string)
@ -52,6 +52,7 @@ var (
const (
stringGroupName = "string-group"
protoGroupName = "proto-group"
expireGroupName = "expire-group"
testMessageType = "google3/net/groupcache/go/test_proto.TestMessage"
fromChan = "from-chan"
cacheSize = 1 << 20
@ -63,7 +64,7 @@ func testSetup() {
key = <-stringc
}
cacheFills.Add(1)
return dest.SetString("ECHO:" + key)
return dest.SetString("ECHO:"+key, time.Time{})
}))
protoGroup = NewGroup(protoGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error {
@ -74,7 +75,12 @@ func testSetup() {
return dest.SetProto(&testpb.TestMessage{
Name: proto.String("ECHO:" + key),
City: proto.String("SOME-CITY"),
})
}, time.Time{})
}))
expireGroup = NewGroup(expireGroupName, cacheSize, GetterFunc(func(_ Context, key string, dest Sink) error {
cacheFills.Add(1)
return dest.SetString("ECHO:"+key, time.Now().Add(time.Millisecond*100))
}))
}
@ -185,6 +191,24 @@ func TestCaching(t *testing.T) {
}
}
func TestCachingExpire(t *testing.T) {
once.Do(testSetup)
fills := countFills(func() {
for i := 0; i < 3; i++ {
var s string
if err := expireGroup.Get(dummyCtx, "TestCachingExpire-key", StringSink(&s)); err != nil {
t.Fatal(err)
}
if i == 1 {
time.Sleep(time.Millisecond * 150)
}
}
})
if fills != 2 {
t.Errorf("expected 2 cache fill; got %d", fills)
}
}
func TestCacheEviction(t *testing.T) {
once.Do(testSetup)
testKey := "TestCacheEviction-key"
@ -261,7 +285,7 @@ func TestPeers(t *testing.T) {
localHits := 0
getter := func(_ Context, key string, dest Sink) error {
localHits++
return dest.SetString("got:" + key)
return dest.SetString("got:"+key, time.Time{})
}
testGroup := newGroup("TestPeers-group", cacheSize, GetterFunc(getter), peerList)
run := func(name string, n int, wantSummary string) {
@ -345,7 +369,7 @@ func TestAllocatingByteSliceTarget(t *testing.T) {
sink := AllocatingByteSliceSink(&dst)
inBytes := []byte("some bytes")
sink.SetBytes(inBytes)
sink.SetBytes(inBytes, time.Time{})
if want := "some bytes"; string(dst) != want {
t.Errorf("SetBytes resulted in %q; want %q", dst, want)
}
@ -388,7 +412,7 @@ func TestNoDedup(t *testing.T) {
const testkey = "testkey"
const testval = "testval"
g := newGroup("testgroup", 1024, GetterFunc(func(_ Context, key string, dest Sink) error {
return dest.SetString(testval)
return dest.SetString(testval, time.Time{})
}), nil)
orderedGroup := &orderedFlightGroup{

View File

@ -1,27 +1,43 @@
// Code generated by protoc-gen-go.
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: groupcache.proto
// DO NOT EDIT!
/*
Package groupcachepb is a generated protocol buffer package.
It is generated from these files:
groupcache.proto
It has these top-level messages:
GetRequest
GetResponse
*/
package groupcachepb
import proto "github.com/golang/protobuf/proto"
import json "encoding/json"
import fmt "fmt"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type GetRequest struct {
Group *string `protobuf:"bytes,1,req,name=group" json:"group,omitempty"`
Key *string `protobuf:"bytes,2,req,name=key" json:"key,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *GetRequest) Reset() { *m = GetRequest{} }
func (m *GetRequest) String() string { return proto.CompactTextString(m) }
func (*GetRequest) ProtoMessage() {}
func (m *GetRequest) Reset() { *m = GetRequest{} }
func (m *GetRequest) String() string { return proto.CompactTextString(m) }
func (*GetRequest) ProtoMessage() {}
func (*GetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *GetRequest) GetGroup() string {
if m != nil && m.Group != nil {
@ -39,13 +55,15 @@ func (m *GetRequest) GetKey() string {
type GetResponse struct {
Value []byte `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"`
MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps" json:"minute_qps,omitempty"`
MinuteQps *float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps" json:"minute_qps,omitempty"`
Expire *int64 `protobuf:"varint,3,opt,name=expire" json:"expire,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *GetResponse) Reset() { *m = GetResponse{} }
func (m *GetResponse) String() string { return proto.CompactTextString(m) }
func (*GetResponse) ProtoMessage() {}
func (m *GetResponse) Reset() { *m = GetResponse{} }
func (m *GetResponse) String() string { return proto.CompactTextString(m) }
func (*GetResponse) ProtoMessage() {}
func (*GetResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *GetResponse) GetValue() []byte {
if m != nil {
@ -61,5 +79,33 @@ func (m *GetResponse) GetMinuteQps() float64 {
return 0
}
func init() {
func (m *GetResponse) GetExpire() int64 {
if m != nil && m.Expire != nil {
return *m.Expire
}
return 0
}
func init() {
proto.RegisterType((*GetRequest)(nil), "groupcachepb.GetRequest")
proto.RegisterType((*GetResponse)(nil), "groupcachepb.GetResponse")
}
func init() { proto.RegisterFile("groupcache.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 197 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x48, 0x2f, 0xca, 0x2f,
0x2d, 0x48, 0x4e, 0x4c, 0xce, 0x48, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x41, 0x88,
0x14, 0x24, 0x29, 0x99, 0x70, 0x71, 0xb9, 0xa7, 0x96, 0x04, 0xa5, 0x16, 0x96, 0xa6, 0x16, 0x97,
0x08, 0x89, 0x70, 0xb1, 0x82, 0x65, 0x25, 0x18, 0x15, 0x98, 0x34, 0x38, 0x83, 0x20, 0x1c, 0x21,
0x01, 0x2e, 0xe6, 0xec, 0xd4, 0x4a, 0x09, 0x26, 0xb0, 0x18, 0x88, 0xa9, 0x14, 0xc5, 0xc5, 0x0d,
0xd6, 0x55, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x0a, 0xd2, 0x56, 0x96, 0x98, 0x53, 0x9a, 0x2a, 0xc1,
0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0xe1, 0x08, 0xc9, 0x72, 0x71, 0xe5, 0x66, 0xe6, 0x95, 0x96,
0xa4, 0xc6, 0x17, 0x16, 0x14, 0x4b, 0x30, 0x29, 0x30, 0x6a, 0x30, 0x06, 0x71, 0x42, 0x44, 0x02,
0x0b, 0x8a, 0x85, 0xc4, 0xb8, 0xd8, 0x52, 0x2b, 0x0a, 0x32, 0x8b, 0x52, 0x25, 0x98, 0x15, 0x18,
0x35, 0x98, 0x83, 0xa0, 0x3c, 0x23, 0x2f, 0x2e, 0x2e, 0x77, 0x90, 0xb5, 0xce, 0x20, 0x17, 0x0a,
0xd9, 0x70, 0x31, 0xbb, 0xa7, 0x96, 0x08, 0x49, 0xe8, 0x21, 0xbb, 0x5a, 0x0f, 0xe1, 0x64, 0x29,
0x49, 0x2c, 0x32, 0x10, 0x67, 0x29, 0x31, 0x00, 0x02, 0x00, 0x00, 0xff, 0xff, 0xd4, 0x73, 0xe1,
0xb8, 0xfe, 0x00, 0x00, 0x00,
}

View File

@ -26,6 +26,7 @@ message GetRequest {
message GetResponse {
optional bytes value = 1;
optional double minute_qps = 2;
optional int64 expire = 3;
}
service GroupCache {

18
http.go
View File

@ -163,15 +163,27 @@ func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
group.Stats.ServerRequests.Add(1)
var value []byte
err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
var b []byte
value := AllocatingByteSliceSink(&b)
err := group.Get(ctx, key, value)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
view, err := value.view()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var expireNano int64
if !view.e.IsZero() {
expireNano = view.Expire().UnixNano()
}
// Write the value to the response body as a proto message.
body, err := proto.Marshal(&pb.GetResponse{Value: value})
body, err := proto.Marshal(&pb.GetResponse{Value: b, Expire: &expireNano})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View File

@ -117,7 +117,7 @@ func beChildForTestHTTPPool() {
p.Set(addrToURL(addrs)...)
getter := GetterFunc(func(ctx Context, key string, dest Sink) error {
dest.SetString(strconv.Itoa(*peerIndex) + ":" + key)
dest.SetString(strconv.Itoa(*peerIndex)+":"+key, time.Time{})
return nil
})
NewGroup("httpPoolTest", 1<<20, getter)

View File

@ -17,7 +17,10 @@ limitations under the License.
// Package lru implements an LRU cache.
package lru
import "container/list"
import (
"container/list"
"time"
)
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
@ -37,8 +40,9 @@ type Cache struct {
type Key interface{}
type entry struct {
key Key
value interface{}
key Key
value interface{}
expire time.Time
}
// New creates a new Cache.
@ -53,7 +57,7 @@ func New(maxEntries int) *Cache {
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
func (c *Cache) Add(key Key, value interface{}, expire time.Time) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
@ -63,7 +67,7 @@ func (c *Cache) Add(key Key, value interface{}) {
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
ele := c.ll.PushFront(&entry{key, value, expire})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
@ -76,8 +80,15 @@ func (c *Cache) Get(key Key) (value interface{}, ok bool) {
return
}
if ele, hit := c.cache[key]; hit {
entry := ele.Value.(*entry)
// If the entry has expired, remove it from the cache
if !entry.expire.IsZero() && entry.expire.Before(time.Now()) {
c.removeElement(ele)
return nil, false
}
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
return entry.value, true
}
return
}

View File

@ -19,6 +19,7 @@ package lru
import (
"fmt"
"testing"
"time"
)
type simpleStruct struct {
@ -48,7 +49,7 @@ var getTests = []struct {
func TestGet(t *testing.T) {
for _, tt := range getTests {
lru := New(0)
lru.Add(tt.keyToAdd, 1234)
lru.Add(tt.keyToAdd, 1234, time.Time{})
val, ok := lru.Get(tt.keyToGet)
if ok != tt.expectedOk {
t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok)
@ -60,7 +61,7 @@ func TestGet(t *testing.T) {
func TestRemove(t *testing.T) {
lru := New(0)
lru.Add("myKey", 1234)
lru.Add("myKey", 1234, time.Time{})
if val, ok := lru.Get("myKey"); !ok {
t.Fatal("TestRemove returned no match")
} else if val != 1234 {
@ -82,7 +83,7 @@ func TestEvict(t *testing.T) {
lru := New(20)
lru.OnEvicted = onEvictedFun
for i := 0; i < 22; i++ {
lru.Add(fmt.Sprintf("myKey%d", i), 1234)
lru.Add(fmt.Sprintf("myKey%d", i), 1234, time.Time{})
}
if len(evictedKeys) != 2 {
@ -95,3 +96,28 @@ func TestEvict(t *testing.T) {
t.Fatalf("got %v in second evicted key; want %s", evictedKeys[1], "myKey1")
}
}
func TestExpire(t *testing.T) {
var tests = []struct {
name string
key interface{}
expectedOk bool
expire time.Duration
wait time.Duration
}{
{"not-expired", "myKey", true, time.Second * 1, time.Duration(0)},
{"expired", "expiredKey", false, time.Millisecond * 100, time.Millisecond * 150},
}
for _, tt := range tests {
lru := New(0)
lru.Add(tt.key, 1234, time.Now().Add(tt.expire))
time.Sleep(tt.wait)
val, ok := lru.Get(tt.key)
if ok != tt.expectedOk {
t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok)
} else if ok && val != 1234 {
t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val)
}
}
}

View File

@ -18,25 +18,32 @@ package groupcache
import (
"errors"
"time"
"github.com/golang/protobuf/proto"
)
var _ Sink = &stringSink{}
var _ Sink = &allocBytesSink{}
var _ Sink = &protoSink{}
var _ Sink = &truncBytesSink{}
var _ Sink = &byteViewSink{}
// A Sink receives data from a Get call.
//
// Implementation of Getter must call exactly one of the Set methods
// on success.
type Sink interface {
// SetString sets the value to s.
SetString(s string) error
SetString(s string, e time.Time) error
// SetBytes sets the value to the contents of v.
// The caller retains ownership of v.
SetBytes(v []byte) error
SetBytes(v []byte, e time.Time) error
// SetProto sets the value to the encoded version of m.
// The caller retains ownership of m.
SetProto(m proto.Message) error
SetProto(m proto.Message, e time.Time) error
// view returns a frozen view of the bytes for caching.
view() (ByteView, error)
@ -60,9 +67,9 @@ func setSinkView(s Sink, v ByteView) error {
return vs.setView(v)
}
if v.b != nil {
return s.SetBytes(v.b)
return s.SetBytes(v.b, v.Expire())
}
return s.SetString(v.s)
return s.SetString(v.s, v.Expire())
}
// StringSink returns a Sink that populates the provided string pointer.
@ -81,24 +88,26 @@ func (s *stringSink) view() (ByteView, error) {
return s.v, nil
}
func (s *stringSink) SetString(v string) error {
func (s *stringSink) SetString(v string, e time.Time) error {
s.v.b = nil
s.v.s = v
*s.sp = v
s.v.e = e
return nil
}
func (s *stringSink) SetBytes(v []byte) error {
return s.SetString(string(v))
func (s *stringSink) SetBytes(v []byte, e time.Time) error {
return s.SetString(string(v), e)
}
func (s *stringSink) SetProto(m proto.Message) error {
func (s *stringSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
s.v.b = b
*s.sp = string(b)
s.v.e = e
return nil
}
@ -132,22 +141,22 @@ func (s *byteViewSink) view() (ByteView, error) {
return *s.dst, nil
}
func (s *byteViewSink) SetProto(m proto.Message) error {
func (s *byteViewSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
*s.dst = ByteView{b: b}
*s.dst = ByteView{b: b, e: e}
return nil
}
func (s *byteViewSink) SetBytes(b []byte) error {
*s.dst = ByteView{b: cloneBytes(b)}
func (s *byteViewSink) SetBytes(b []byte, e time.Time) error {
*s.dst = ByteView{b: cloneBytes(b), e: e}
return nil
}
func (s *byteViewSink) SetString(v string) error {
*s.dst = ByteView{s: v}
func (s *byteViewSink) SetString(v string, e time.Time) error {
*s.dst = ByteView{s: v, e: e}
return nil
}
@ -161,6 +170,7 @@ func ProtoSink(m proto.Message) Sink {
type protoSink struct {
dst proto.Message // authoritative value
typ string
ttl time.Duration
v ByteView // encoded
}
@ -169,17 +179,18 @@ func (s *protoSink) view() (ByteView, error) {
return s.v, nil
}
func (s *protoSink) SetBytes(b []byte) error {
func (s *protoSink) SetBytes(b []byte, e time.Time) error {
err := proto.Unmarshal(b, s.dst)
if err != nil {
return err
}
s.v.b = cloneBytes(b)
s.v.s = ""
s.v.e = e
return nil
}
func (s *protoSink) SetString(v string) error {
func (s *protoSink) SetString(v string, e time.Time) error {
b := []byte(v)
err := proto.Unmarshal(b, s.dst)
if err != nil {
@ -187,10 +198,11 @@ func (s *protoSink) SetString(v string) error {
}
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
func (s *protoSink) SetProto(m proto.Message) error {
func (s *protoSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
@ -205,6 +217,7 @@ func (s *protoSink) SetProto(m proto.Message) error {
}
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
@ -234,35 +247,37 @@ func (s *allocBytesSink) setView(v ByteView) error {
return nil
}
func (s *allocBytesSink) SetProto(m proto.Message) error {
func (s *allocBytesSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
return s.setBytesOwned(b)
return s.setBytesOwned(b, e)
}
func (s *allocBytesSink) SetBytes(b []byte) error {
return s.setBytesOwned(cloneBytes(b))
func (s *allocBytesSink) SetBytes(b []byte, e time.Time) error {
return s.setBytesOwned(cloneBytes(b), e)
}
func (s *allocBytesSink) setBytesOwned(b []byte) error {
func (s *allocBytesSink) setBytesOwned(b []byte, e time.Time) error {
if s.dst == nil {
return errors.New("nil AllocatingByteSliceSink *[]byte dst")
}
*s.dst = cloneBytes(b) // another copy, protecting the read-only s.v.b view
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
func (s *allocBytesSink) SetString(v string) error {
func (s *allocBytesSink) SetString(v string, e time.Time) error {
if s.dst == nil {
return errors.New("nil AllocatingByteSliceSink *[]byte dst")
}
*s.dst = []byte(v)
s.v.b = nil
s.v.s = v
s.v.e = e
return nil
}
@ -283,19 +298,19 @@ func (s *truncBytesSink) view() (ByteView, error) {
return s.v, nil
}
func (s *truncBytesSink) SetProto(m proto.Message) error {
func (s *truncBytesSink) SetProto(m proto.Message, e time.Time) error {
b, err := proto.Marshal(m)
if err != nil {
return err
}
return s.setBytesOwned(b)
return s.setBytesOwned(b, e)
}
func (s *truncBytesSink) SetBytes(b []byte) error {
return s.setBytesOwned(cloneBytes(b))
func (s *truncBytesSink) SetBytes(b []byte, e time.Time) error {
return s.setBytesOwned(cloneBytes(b), e)
}
func (s *truncBytesSink) setBytesOwned(b []byte) error {
func (s *truncBytesSink) setBytesOwned(b []byte, e time.Time) error {
if s.dst == nil {
return errors.New("nil TruncatingByteSliceSink *[]byte dst")
}
@ -305,10 +320,11 @@ func (s *truncBytesSink) setBytesOwned(b []byte) error {
}
s.v.b = b
s.v.s = ""
s.v.e = e
return nil
}
func (s *truncBytesSink) SetString(v string) error {
func (s *truncBytesSink) SetString(v string, e time.Time) error {
if s.dst == nil {
return errors.New("nil TruncatingByteSliceSink *[]byte dst")
}
@ -318,5 +334,6 @@ func (s *truncBytesSink) SetString(v string) error {
}
s.v.b = nil
s.v.s = v
s.v.e = e
return nil
}