rendezvous interface should expose full registration information

- instead of returning PeerInfos, return Registration objects in discovery
- provide utility functions at module level for peer info discovery
This commit is contained in:
vyzo 2018-04-21 18:51:18 +03:00
parent aeac2e2a0f
commit e5a72b9bea

View File

@ -22,8 +22,14 @@ const DefaultTTL = 2 * 3600 // 2hr
type Rendezvous interface { type Rendezvous interface {
Register(ctx context.Context, ns string, ttl int) error Register(ctx context.Context, ns string, ttl int) error
Unregister(ctx context.Context, ns string) error Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error)
}
type Registration struct {
Peer pstore.PeerInfo
Ns string
Ttl int
} }
func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous { func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous {
@ -111,7 +117,7 @@ func (cli *client) Unregister(ctx context.Context, ns string) error {
return w.WriteMsg(req) return w.WriteMsg(req)
} }
func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -124,7 +130,7 @@ func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []
return discoverQuery(ns, limit, cookie, r, w) return discoverQuery(ns, limit, cookie, r, w)
} }
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]pstore.PeerInfo, []byte, error) { func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) {
req := newDiscoverMessage(ns, limit, cookie) req := newDiscoverMessage(ns, limit, cookie)
err := w.WriteMsg(req) err := w.WriteMsg(req)
@ -148,31 +154,31 @@ func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Wr
} }
regs := res.GetDiscoverResponse().GetRegistrations() regs := res.GetDiscoverResponse().GetRegistrations()
pinfos := make([]pstore.PeerInfo, 0, len(regs)) result := make([]Registration, 0, len(regs))
for _, reg := range regs { for _, reg := range regs {
pi, err := pbToPeerInfo(reg.GetPeer()) pi, err := pbToPeerInfo(reg.GetPeer())
if err != nil { if err != nil {
log.Errorf("Invalid peer info: %s", err.Error()) log.Errorf("Invalid peer info: %s", err.Error())
continue continue
} }
pinfos = append(pinfos, pi) result = append(result, Registration{Peer: pi, Ns: reg.GetNs(), Ttl: int(reg.GetTtl())})
} }
return pinfos, res.GetDiscoverResponse().GetCookie(), nil return result, res.GetDiscoverResponse().GetCookie(), nil
} }
func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) { func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ch := make(chan pstore.PeerInfo) ch := make(chan Registration)
go discoverAsync(ctx, ns, s, ch) go discoverAsync(ctx, ns, s, ch)
return ch, nil return ch, nil
} }
func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore.PeerInfo) { func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Registration) {
defer s.Close() defer s.Close()
defer close(ch) defer close(ch)
@ -183,26 +189,26 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore
var ( var (
cookie []byte cookie []byte
pi []pstore.PeerInfo regs []Registration
err error err error
) )
for { for {
pi, cookie, err = discoverQuery(ns, batch, cookie, r, w) regs, cookie, err = discoverQuery(ns, batch, cookie, r, w)
if err != nil { if err != nil {
log.Errorf("Error in discovery [%s]: %s", ns, err.Error()) log.Errorf("Error in discovery [%s]: %s", ns, err.Error())
return return
} }
for _, p := range pi { for _, reg := range regs {
select { select {
case ch <- p: case ch <- reg:
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
if len(pi) < batch { if len(regs) < batch {
select { select {
case <-time.After(2 * time.Minute): case <-time.After(2 * time.Minute):
case <-ctx.Done(): case <-ctx.Done():
@ -211,3 +217,48 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore
} }
} }
} }
func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) {
regs, cookie, err := rz.Discover(ctx, ns, limit, cookie)
if err != nil {
return nil, nil, err
}
pinfos := make([]pstore.PeerInfo, len(regs))
for i, reg := range regs {
pinfos[i] = reg.Peer
}
return pinfos, cookie, nil
}
func DiscoverPeersAsync(ctx context.Context, rz Rendezvous, ns string) (<-chan pstore.PeerInfo, error) {
rch, err := rz.DiscoverAsync(ctx, ns)
if err != nil {
return nil, err
}
ch := make(chan pstore.PeerInfo)
go discoverPeersAsync(ctx, rch, ch)
return ch, nil
}
func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan pstore.PeerInfo) {
defer close(ch)
for {
select {
case reg, ok := <-rch:
if !ok {
return
}
select {
case ch <- reg.Peer:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}