From a06134f0daef8adbf3838584c584c2d3a89d29ec Mon Sep 17 00:00:00 2001 From: Dan Pupius Date: Wed, 25 Sep 2013 14:49:30 -0700 Subject: [PATCH] PeerPicker uses ring hash to pick peers --- consistenthash/consistenthash.go | 81 +++++++++++++++++++++++++ consistenthash/consistenthash_test.go | 86 +++++++++++++++++++++++++++ http.go | 23 ++++--- http_test.go | 2 +- 4 files changed, 178 insertions(+), 14 deletions(-) create mode 100644 consistenthash/consistenthash.go create mode 100644 consistenthash/consistenthash_test.go diff --git a/consistenthash/consistenthash.go b/consistenthash/consistenthash.go new file mode 100644 index 0000000..455ffd5 --- /dev/null +++ b/consistenthash/consistenthash.go @@ -0,0 +1,81 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package consistenthash provides an implementation of a ring hash. +package consistenthash + +import ( + "hash/crc32" + "sort" + "strconv" +) + +type Hash func(data []byte) uint32 + +type Map struct { + hash Hash + replicas int + keys []int // Sorted + hashMap map[int]string +} + +func New(replicas int, fn Hash) *Map { + m := &Map{ + replicas: replicas, + hash: fn, + hashMap: make(map[int]string), + } + if m.hash == nil { + m.hash = crc32.ChecksumIEEE + } + return m +} + +// Returns true if there are no items available. +func (m *Map) IsEmpty() bool { + return len(m.keys) == 0 +} + +// Adds some keys to the hash. +func (m *Map) Add(keys ...string) { + for _, key := range keys { + for i := 0; i < m.replicas; i++ { + hash := int(m.hash([]byte(strconv.Itoa(i) + key))) + m.keys = append(m.keys, hash) + m.hashMap[hash] = key + } + } + sort.Ints(m.keys) +} + +// Gets the closest item in the hash to the provided key. +func (m *Map) Get(key string) string { + if m.IsEmpty() { + return "" + } + + hash := int(m.hash([]byte(key))) + + // Linear search for appropriate replica. + for _, v := range m.keys { + if v >= hash { + return m.hashMap[v] + } + } + + // Means we have cycled back to the first replica. + return m.hashMap[m.keys[0]] +} diff --git a/consistenthash/consistenthash_test.go b/consistenthash/consistenthash_test.go new file mode 100644 index 0000000..a1b96db --- /dev/null +++ b/consistenthash/consistenthash_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consistenthash + +import ( + "strconv" + "testing" +) + +func TestHashing(t *testing.T) { + + // Override the hash function to return easier to reason about values. Assumes + // the keys can be converted to an integer. + hash := New(3, func(key []byte) uint32 { + i, err := strconv.Atoi(string(key)) + if err != nil { + panic(err) + } + return uint32(i) + }) + + // Given the above hash function, this will give replicas with "hashes": + // 2, 4, 6, 12, 14, 16, 22, 24, 26 + hash.Add("6", "4", "2") + + testCases := map[string]string{ + "2": "2", + "11": "2", + "23": "4", + "27": "2", + } + + for k, v := range testCases { + if hash.Get(k) != v { + t.Errorf("Asking for %s, should have yielded %s", k, v) + } + } + + // Adds 8, 18, 28 + hash.Add("8") + + // 27 should now map to 8. + testCases["27"] = "8" + + for k, v := range testCases { + if hash.Get(k) != v { + t.Errorf("Asking for %s, should have yielded %s", k, v) + } + } + +} + +func TestConsistency(t *testing.T) { + hash1 := New(1, nil) + hash2 := New(1, nil) + + hash1.Add("Bill", "Bob", "Bonny") + hash2.Add("Bob", "Bonny", "Bill") + + if hash1.Get("Ben") != hash2.Get("Ben") { + t.Errorf("Fetching 'Ben' from both hashes should be the same") + } + + hash2.Add("Becky", "Ben", "Bobby") + + if hash1.Get("Ben") != hash2.Get("Ben") || + hash1.Get("Bob") != hash2.Get("Bob") || + hash1.Get("Bonny") != hash2.Get("Bonny") { + t.Errorf("Direct matches should always return the same entry") + } + +} diff --git a/http.go b/http.go index b131ed9..af329b5 100644 --- a/http.go +++ b/http.go @@ -18,7 +18,6 @@ package groupcache import ( "fmt" - "hash/crc32" "io/ioutil" "net/http" "net/url" @@ -26,13 +25,16 @@ import ( "sync" "code.google.com/p/goprotobuf/proto" - + "github.com/golang/groupcache/consistenthash" pb "github.com/golang/groupcache/groupcachepb" ) // TODO: make this configurable? const defaultBasePath = "/_groupcache/" +// TODO: make this configurable as well. +const defaultReplicas = 3 + // HTTPPool implements PeerPicker for a pool of HTTP peers. type HTTPPool struct { // Context optionally specifies a context for the server to use when it @@ -52,7 +54,7 @@ type HTTPPool struct { self string mu sync.Mutex - peers []string + peers *consistenthash.Map } var httpPoolMade bool @@ -67,7 +69,7 @@ func NewHTTPPool(self string) *HTTPPool { panic("groupcache: NewHTTPPool must be called only once") } httpPoolMade = true - p := &HTTPPool{basePath: defaultBasePath, self: self} + p := &HTTPPool{basePath: defaultBasePath, self: self, peers: consistenthash.New(defaultReplicas, nil)} RegisterPeerPicker(func() PeerPicker { return p }) http.Handle(defaultBasePath, p) return p @@ -79,22 +81,17 @@ func NewHTTPPool(self string) *HTTPPool { func (p *HTTPPool) Set(peers ...string) { p.mu.Lock() defer p.mu.Unlock() - p.peers = append([]string{}, peers...) + p.peers = consistenthash.New(defaultReplicas, nil) + p.peers.Add(peers...) } func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) { - // TODO: make checksum implementation pluggable - h := crc32.Checksum([]byte(key), crc32.IEEETable) p.mu.Lock() defer p.mu.Unlock() - if len(p.peers) == 0 { + if p.peers.IsEmpty() { return nil, false } - n := int(h) - if n < 0 { - n *= -1 - } - if peer := p.peers[n%len(p.peers)]; peer != p.self { + if peer := p.peers.Get(key); peer != p.self { // TODO: pre-build a slice of *httpGetter when Set() // is called to avoid these two allocations. return &httpGetter{p.Transport, peer + p.basePath}, true diff --git a/http_test.go b/http_test.go index 279bcbf..b42edd7 100644 --- a/http_test.go +++ b/http_test.go @@ -84,7 +84,7 @@ func TestHTTPPool(t *testing.T) { // Dummy getter function. Gets should go to children only. // The only time this process will handle a get is when the - // children can't be contacted for seome reason. + // children can't be contacted for some reason. getter := GetterFunc(func(ctx Context, key string, dest Sink) error { return errors.New("parent getter called; something's wrong") })