fixed compile error from previous commit + code refactoring

This commit is contained in:
Adin Schmahmann 2019-06-05 13:44:44 -04:00
parent 9052b531cc
commit 25d0082f05
5 changed files with 62 additions and 46 deletions

View File

@ -66,7 +66,7 @@ type rendezvousClient struct {
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) { func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil { if err != nil {
return err return 0, err
} }
defer s.Close() defer s.Close()
@ -134,7 +134,7 @@ func registerRefresh(ctx context.Context, rz RendezvousPoint, ns string, ttl int
return return
} }
err := rz.Register(ctx, ns, ttl) _, err := rz.Register(ctx, ns, ttl)
if err != nil { if err != nil {
log.Errorf("Error registering [%s]: %s", ns, err.Error()) log.Errorf("Error registering [%s]: %s", ns, err.Error())
errcount++ errcount++

View File

@ -31,10 +31,13 @@ func TestClientRegistrationAndDiscovery(t *testing.T) {
clients := getRendezvousClients(t, hosts) clients := getRendezvousClients(t, hosts)
err = clients[0].Register(ctx, "foo1", DefaultTTL) recordTTL, err := clients[0].Register(ctx, "foo1", DefaultTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if recordTTL != DefaultTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
pi, cookie, err := clients[0].Discover(ctx, "foo1", 0, nil) pi, cookie, err := clients[0].Discover(ctx, "foo1", 0, nil)
if err != nil { if err != nil {
@ -46,10 +49,13 @@ func TestClientRegistrationAndDiscovery(t *testing.T) {
checkPeerInfo(t, pi[0], hosts[1]) checkPeerInfo(t, pi[0], hosts[1])
for i, client := range clients[1:] { for i, client := range clients[1:] {
err = client.Register(ctx, "foo1", DefaultTTL) recordTTL, err = client.Register(ctx, "foo1", DefaultTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if recordTTL != DefaultTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
pi, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie) pi, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie)
if err != nil { if err != nil {
@ -98,10 +104,13 @@ func TestClientRegistrationAndDiscoveryAsync(t *testing.T) {
} }
for i, client := range clients[0:] { for i, client := range clients[0:] {
err = client.Register(ctx, "foo1", DefaultTTL) recordTTL, err := client.Register(ctx, "foo1", DefaultTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if recordTTL != DefaultTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
pi := <-ch pi := <-ch
checkPeerInfo(t, pi, hosts[1+i]) checkPeerInfo(t, pi, hosts[1+i])

View File

@ -11,7 +11,7 @@ import (
"time" "time"
) )
type rendezvousDiscoveryClient struct { type rendezvousDiscovery struct {
rp RendezvousPoint rp RendezvousPoint
peerCache sync.Map //is a map[string]discoveredPeerCache peerCache sync.Map //is a map[string]discoveredPeerCache
rng *rand.Rand rng *rand.Rand
@ -19,17 +19,22 @@ type rendezvousDiscoveryClient struct {
} }
type discoveredPeerCache struct { type discoveredPeerCache struct {
cachedRegs map[peer.ID]*Registration cachedRecs map[peer.ID]*record
cookie []byte cookie []byte
mux sync.Mutex mux sync.Mutex
} }
func NewRendezvousDiscoveryClient(host host.Host, rendezvousPeer peer.ID) discovery.Discovery { type record struct {
rp := NewRendezvousPoint(host, rendezvousPeer) peer peer.AddrInfo
return &rendezvousDiscoveryClient{rp, sync.Map{}, rand.New(rand.NewSource(rand.Int63())), sync.Mutex{}} expire int64
} }
func (c *rendezvousDiscoveryClient) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { func NewRendezvousDiscovery(host host.Host, rendezvousPeer peer.ID) discovery.Discovery {
rp := NewRendezvousPoint(host, rendezvousPeer)
return &rendezvousDiscovery{rp, sync.Map{}, rand.New(rand.NewSource(rand.Int63())), sync.Mutex{}}
}
func (c *rendezvousDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
// Get options // Get options
var options discovery.Options var options discovery.Options
err := options.Apply(opts...) err := options.Apply(opts...)
@ -53,7 +58,7 @@ func (c *rendezvousDiscoveryClient) Advertise(ctx context.Context, ns string, op
} }
} }
func (c *rendezvousDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { func (c *rendezvousDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
// Get options // Get options
var options discovery.Options var options discovery.Options
err := options.Apply(opts...) err := options.Apply(opts...)
@ -74,54 +79,49 @@ func (c *rendezvousDiscoveryClient) FindPeers(ctx context.Context, ns string, op
cache = genericCache.(*discoveredPeerCache) cache = genericCache.(*discoveredPeerCache)
cache.mux.Lock() cache.mux.Lock()
defer cache.mux.Unlock()
// Remove all expired entries from cache // Remove all expired entries from cache
currentTime := int(time.Now().Unix()) currentTime := time.Now().Unix()
newCacheSize := len(cache.cachedRegs) newCacheSize := len(cache.cachedRecs)
for p := range cache.cachedRegs { for p := range cache.cachedRecs {
reg := cache.cachedRegs[p] rec := cache.cachedRecs[p]
if reg.Ttl < currentTime { if rec.expire < currentTime {
newCacheSize-- newCacheSize--
delete(cache.cachedRegs, p) delete(cache.cachedRecs, p)
} }
} }
cookie := cache.cookie cookie := cache.cookie
cache.mux.Unlock()
// Discover new records if we don't have enough // Discover new records if we don't have enough
var discoveryErr error
if newCacheSize < limit { if newCacheSize < limit {
if discoveryRecords, newCookie, err := c.rp.Discover(ctx, ns, limit, cookie); err == nil { // TODO: Should we return error even if we have valid cached results?
cache.mux.Lock() var regs []Registration
if cache.cachedRegs == nil { var newCookie []byte
cache.cachedRegs = make(map[peer.ID]*Registration) if regs, newCookie, err = c.rp.Discover(ctx, ns, limit, cookie); err == nil {
if cache.cachedRecs == nil {
cache.cachedRecs = make(map[peer.ID]*record)
} }
for i := range discoveryRecords { for _, reg := range regs {
rec := &discoveryRecords[i] rec := &record{peer: reg.Peer, expire: int64(reg.Ttl) + currentTime}
rec.Ttl += currentTime cache.cachedRecs[rec.peer.ID] = rec
cache.cachedRegs[rec.Peer.ID] = rec
} }
cache.cookie = newCookie cache.cookie = newCookie
cache.mux.Unlock()
} else {
// TODO: Should we return error even if we have valid cached results?
discoveryErr = err
} }
} }
// Randomize and fill channel with available records // Randomize and fill channel with available records
cache.mux.Lock() count := len(cache.cachedRecs)
sendQuantity := len(cache.cachedRegs) if limit < count {
if limit < sendQuantity { count = limit
sendQuantity = limit
} }
chPeer := make(chan peer.AddrInfo, sendQuantity) chPeer := make(chan peer.AddrInfo, count)
c.rngMux.Lock() c.rngMux.Lock()
perm := c.rng.Perm(len(cache.cachedRegs))[0:sendQuantity] perm := c.rng.Perm(len(cache.cachedRecs))[0:count]
c.rngMux.Unlock() c.rngMux.Unlock()
permSet := make(map[int]int) permSet := make(map[int]int)
@ -129,11 +129,11 @@ func (c *rendezvousDiscoveryClient) FindPeers(ctx context.Context, ns string, op
permSet[v] = i permSet[v] = i
} }
sendLst := make([]*peer.AddrInfo, sendQuantity) sendLst := make([]*peer.AddrInfo, count)
iter := 0 iter := 0
for k := range cache.cachedRegs { for k := range cache.cachedRecs {
if sendIndex, ok := permSet[iter]; ok { if sendIndex, ok := permSet[iter]; ok {
sendLst[sendIndex] = &cache.cachedRegs[k].Peer sendLst[sendIndex] = &cache.cachedRecs[k].peer
} }
iter++ iter++
} }
@ -142,7 +142,6 @@ func (c *rendezvousDiscoveryClient) FindPeers(ctx context.Context, ns string, op
chPeer <- *send chPeer <- *send
} }
cache.mux.Unlock()
close(chPeer) close(chPeer)
return chPeer, discoveryErr return chPeer, err
} }

View File

@ -17,7 +17,7 @@ func getRendezvousDiscovery(hosts []host.Host) []discovery.Discovery {
for i, h := range hosts[1:] { for i, h := range hosts[1:] {
rp := NewRendezvousPoint(h, rendezvousPeer) rp := NewRendezvousPoint(h, rendezvousPeer)
rng := rand.New(rand.NewSource(int64(i))) rng := rand.New(rand.NewSource(int64(i)))
clients[i] = &rendezvousDiscoveryClient{rp: rp, peerCache: sync.Map{}, rng: rng} clients[i] = &rendezvousDiscovery{rp: rp, peerCache: sync.Map{}, rng: rng}
} }
return clients return clients
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"testing" "testing"
"time"
db "github.com/libp2p/go-libp2p-rendezvous/db/sqlite" db "github.com/libp2p/go-libp2p-rendezvous/db/sqlite"
pb "github.com/libp2p/go-libp2p-rendezvous/pb" pb "github.com/libp2p/go-libp2p-rendezvous/pb"
@ -77,10 +78,14 @@ func TestSVCRegistrationAndDiscovery(t *testing.T) {
clients := getRendezvousPoints(t, hosts) clients := getRendezvousPoints(t, hosts)
err = clients[0].Register(ctx, "foo1", 60) const registerTTL = 60
recordTTL, err := clients[0].Register(ctx, "foo1", registerTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if recordTTL != registerTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
rrs, cookie, err := clients[0].Discover(ctx, "foo1", 10, nil) rrs, cookie, err := clients[0].Discover(ctx, "foo1", 10, nil)
if err != nil { if err != nil {
@ -92,10 +97,13 @@ func TestSVCRegistrationAndDiscovery(t *testing.T) {
checkHostRegistration(t, rrs[0], hosts[1]) checkHostRegistration(t, rrs[0], hosts[1])
for i, client := range clients[1:] { for i, client := range clients[1:] {
err = client.Register(ctx, "foo1", 60) recordTTL, err = client.Register(ctx, "foo1", registerTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if recordTTL != registerTTL*time.Second {
t.Fatalf("Expected record TTL to be %d seconds", DefaultTTL)
}
rrs, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie) rrs, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie)
if err != nil { if err != nil {