refactor: obtain the peers from peerstore

This commit is contained in:
Richard Ramos 2021-09-30 10:39:04 -04:00
parent 1b6d7e4055
commit ba3a59f225
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
4 changed files with 39 additions and 16 deletions

View File

@ -37,20 +37,18 @@ type RendezvousClient interface {
DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
}
func NewRendezvousPoint(host host.Host, p []peer.ID) RendezvousPoint {
func NewRendezvousPoint(host host.Host) RendezvousPoint {
return &rendezvousPoint{
host: host,
p: p,
}
}
type rendezvousPoint struct {
host host.Host
p []peer.ID
}
func NewRendezvousClient(host host.Host, rp []peer.ID) RendezvousClient {
return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp))
func NewRendezvousClient(host host.Host) RendezvousClient {
return NewRendezvousClientWithPoint(NewRendezvousPoint(host))
}
func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient {
@ -61,12 +59,28 @@ type rendezvousClient struct {
rp RendezvousPoint
}
func (r *rendezvousPoint) getRandomPeer() peer.ID {
return r.p[rand.Intn(len(r.p))] // nolint: gosec
func (r *rendezvousPoint) getRandomPeer() (peer.ID, error) {
var peerIDs []peer.ID
for _, peer := range r.host.Peerstore().Peers() {
protocols, err := r.host.Peerstore().SupportsProtocols(peer, string(RendezvousID_v001))
if err != nil {
log.Error("error obtaining the protocols supported by peers", err)
return "", err
}
if len(protocols) > 0 {
peerIDs = append(peerIDs, peer)
}
}
return peerIDs[rand.Intn(len(peerIDs))], nil // nolint: gosec
}
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
s, err := rp.host.NewStream(ctx, rp.getRandomPeer(), RendezvousProto)
randomPeer, err := rp.getRandomPeer()
if err != nil {
return 0, err
}
s, err := rp.host.NewStream(ctx, randomPeer, RendezvousID_v001)
if err != nil {
return 0, err
}
@ -147,7 +161,12 @@ func registerRefresh(ctx context.Context, rz RendezvousPoint, ns string, ttl int
}
func (rp *rendezvousPoint) Discover(ctx context.Context, ns string, limit int) ([]Registration, error) {
s, err := rp.host.NewStream(ctx, rp.getRandomPeer(), RendezvousProto)
randomPeer, err := rp.getRandomPeer()
if err != nil {
return nil, err
}
s, err := rp.host.NewStream(ctx, randomPeer, RendezvousID_v001)
if err != nil {
return nil, err
}
@ -196,7 +215,12 @@ func discoverQuery(ns string, limit int, r ggio.Reader, w ggio.Writer) ([]Regist
}
func (rp *rendezvousPoint) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) {
s, err := rp.host.NewStream(ctx, rp.getRandomPeer(), RendezvousProto)
randomPeer, err := rp.getRandomPeer()
if err != nil {
return nil, err
}
s, err := rp.host.NewStream(ctx, randomPeer, RendezvousID_v001)
if err != nil {
return nil, err
}

View File

@ -31,8 +31,8 @@ type record struct {
expire int64
}
func NewRendezvousDiscovery(host host.Host, rendezvousPeers []peer.ID) discovery.Discovery {
rp := NewRendezvousPoint(host, rendezvousPeers)
func NewRendezvousDiscovery(host host.Host) discovery.Discovery {
rp := NewRendezvousPoint(host)
return &rendezvousDiscovery{rp: rp, peerCache: make(map[string]*discoveryCache), rng: rand.New(rand.NewSource(rand.Int63()))}
}

View File

@ -15,9 +15,8 @@ import (
var log = logging.Logger("rendezvous")
const (
RendezvousProto = protocol.ID("/vac/waku/rendezvous/0.0.1")
DefaultTTL = 2 * 3600 // 2hr
RendezvousID_v001 = protocol.ID("/vac/waku/rendezvous/0.0.1")
DefaultTTL = 2 * 3600 // 2hr
)
type RendezvousError struct {

2
svc.go
View File

@ -48,7 +48,7 @@ func NewRendezvousService(host host.Host, storage Storage, rzs ...RendezvousSync
}
func (rz *RendezvousService) Start() error {
rz.h.SetStreamHandler(RendezvousProto, rz.handleStream)
rz.h.SetStreamHandler(RendezvousID_v001, rz.handleStream)
if err := rz.startCleaner(); err != nil {
return err