From e5a72b9bea5ecd8f9c908b804816dc81a0ebc1e2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 21 Apr 2018 18:51:18 +0300 Subject: [PATCH] rendezvous interface should expose full registration information - instead of returning PeerInfos, return Registration objects in discovery - provide utility functions at module level for peer info discovery --- client.go | 81 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/client.go b/client.go index dc629aa..6e31404 100644 --- a/client.go +++ b/client.go @@ -22,8 +22,14 @@ const DefaultTTL = 2 * 3600 // 2hr type Rendezvous interface { Register(ctx context.Context, ns string, ttl int) error Unregister(ctx context.Context, ns string) error - Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) - DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) + Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) + DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) +} + +type Registration struct { + Peer pstore.PeerInfo + Ns string + Ttl int } func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous { @@ -111,7 +117,7 @@ func (cli *client) Unregister(ctx context.Context, ns string) error { return w.WriteMsg(req) } -func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { +func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) { s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) if err != nil { return nil, nil, err @@ -124,7 +130,7 @@ func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie [] return discoverQuery(ns, limit, cookie, r, w) } -func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]pstore.PeerInfo, []byte, error) { +func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) { req := newDiscoverMessage(ns, limit, cookie) err := w.WriteMsg(req) @@ -148,31 +154,31 @@ func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Wr } regs := res.GetDiscoverResponse().GetRegistrations() - pinfos := make([]pstore.PeerInfo, 0, len(regs)) + result := make([]Registration, 0, len(regs)) for _, reg := range regs { pi, err := pbToPeerInfo(reg.GetPeer()) if err != nil { log.Errorf("Invalid peer info: %s", err.Error()) continue } - pinfos = append(pinfos, pi) + result = append(result, Registration{Peer: pi, Ns: reg.GetNs(), Ttl: int(reg.GetTtl())}) } - return pinfos, res.GetDiscoverResponse().GetCookie(), nil + return result, res.GetDiscoverResponse().GetCookie(), nil } -func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) { +func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) { s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) if err != nil { return nil, err } - ch := make(chan pstore.PeerInfo) + ch := make(chan Registration) go discoverAsync(ctx, ns, s, ch) return ch, nil } -func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore.PeerInfo) { +func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Registration) { defer s.Close() defer close(ch) @@ -183,26 +189,26 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore var ( cookie []byte - pi []pstore.PeerInfo + regs []Registration err error ) for { - pi, cookie, err = discoverQuery(ns, batch, cookie, r, w) + regs, cookie, err = discoverQuery(ns, batch, cookie, r, w) if err != nil { log.Errorf("Error in discovery [%s]: %s", ns, err.Error()) return } - for _, p := range pi { + for _, reg := range regs { select { - case ch <- p: + case ch <- reg: case <-ctx.Done(): return } } - if len(pi) < batch { + if len(regs) < batch { select { case <-time.After(2 * time.Minute): case <-ctx.Done(): @@ -211,3 +217,48 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore } } } + +func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { + regs, cookie, err := rz.Discover(ctx, ns, limit, cookie) + if err != nil { + return nil, nil, err + } + + pinfos := make([]pstore.PeerInfo, len(regs)) + for i, reg := range regs { + pinfos[i] = reg.Peer + } + + return pinfos, cookie, nil +} + +func DiscoverPeersAsync(ctx context.Context, rz Rendezvous, ns string) (<-chan pstore.PeerInfo, error) { + rch, err := rz.DiscoverAsync(ctx, ns) + if err != nil { + return nil, err + } + + ch := make(chan pstore.PeerInfo) + go discoverPeersAsync(ctx, rch, ch) + return ch, nil +} + +func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan pstore.PeerInfo) { + defer close(ch) + for { + select { + case reg, ok := <-rch: + if !ok { + return + } + + select { + case ch <- reg.Peer: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } +}