From 1506c04f1ae3ef4709e1d6359833a1623c43583e Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 20 Apr 2018 13:07:01 +0300 Subject: [PATCH] simplify Rendezvous interface --- client.go | 95 +++++++++++++++++++------------------------------------ 1 file changed, 32 insertions(+), 63 deletions(-) diff --git a/client.go b/client.go index 2f64ebb..97c5f05 100644 --- a/client.go +++ b/client.go @@ -17,12 +17,13 @@ import ( var log = logging.Logger("rendezvous") +const DefaultTTL = 2 * 3600 // 2hr + type Rendezvous interface { - RegisterOnce(ctx context.Context, ns string, ttl int) error - Register(ctx context.Context, ns string, opts ...interface{}) error + Register(ctx context.Context, ns string, ttl int) error Unregister(ctx context.Context, ns string) error - DiscoverOnce(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) - Discover(ctx context.Context, ns string, opts ...interface{}) (<-chan pstore.PeerInfo, error) + Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) + DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) } func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous { @@ -37,22 +38,18 @@ type client struct { rp peer.ID } -func (cli *client) RegisterOnce(ctx context.Context, ns string, ttl int) error { +func (cli *client) Register(ctx context.Context, ns string, ttl int) error { s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) if err != nil { return err } defer s.Close() - return cli.registerOnce(ctx, ns, ttl, s) -} - -func (cli *client) registerOnce(ctx context.Context, ns string, ttl int, s inet.Stream) error { r := ggio.NewDelimitedReader(s, 1<<20) w := ggio.NewDelimitedWriter(s) req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl) - err := w.WriteMsg(req) + err = w.WriteMsg(req) if err != nil { return err } @@ -75,48 +72,30 @@ func (cli *client) registerOnce(ctx context.Context, ns string, ttl int, s inet. return nil } -func (cli *client) Register(ctx context.Context, ns string, opts ...interface{}) error { - var E func(error) - - for _, opt := range opts { - switch o := opt.(type) { - case func(error): - E = o - - default: - return fmt.Errorf("Unexpected option: %v", opt) - } - } - - s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) +func Register(ctx context.Context, rz Rendezvous, ns string) error { + err := rz.Register(ctx, ns, DefaultTTL) if err != nil { return err } - go cli.doRegister(ctx, ns, E, s) + go registerRefresh(ctx, rz, ns) return nil } -func (cli *client) doRegister(ctx context.Context, ns string, E func(error), s inet.Stream) { - const ttl = 2 * 3600 // 2hr - const refresh = ttl - 30 +func registerRefresh(ctx context.Context, rz Rendezvous, ns string) { + const refresh = DefaultTTL - 30 - defer s.Close() for { - err := cli.registerOnce(ctx, ns, ttl, s) - if err != nil { - log.Errorf("Error registering [%s]: %s", ns, err.Error()) - if E != nil { - go E(err) - } - return - } - select { case <-time.After(refresh * time.Second): case <-ctx.Done(): return } + + err := rz.Register(ctx, ns, DefaultTTL) + if err != nil { + log.Errorf("Error registering [%s]: %s", ns, err.Error()) + } } } @@ -132,20 +111,21 @@ func (cli *client) Unregister(ctx context.Context, ns string) error { return w.WriteMsg(req) } -func (cli *client) DiscoverOnce(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { +func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) if err != nil { return nil, nil, err } defer s.Close() - return cli.discoverOnce(ctx, ns, limit, cookie, s) -} - -func (cli *client) discoverOnce(ctx context.Context, ns string, limit int, cookie []byte, s inet.Stream) ([]pstore.PeerInfo, []byte, error) { r := ggio.NewDelimitedReader(s, 1<<20) w := ggio.NewDelimitedWriter(s) + return discoverQuery(ns, limit, cookie, r, w) +} + +func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]pstore.PeerInfo, []byte, error) { + req := newDiscoverMessage(ns, limit, cookie) err := w.WriteMsg(req) if err != nil { @@ -176,33 +156,24 @@ func (cli *client) discoverOnce(ctx context.Context, ns string, limit int, cooki return pinfos, res.GetDiscoverResponse().GetCookie(), nil } -func (cli *client) Discover(ctx context.Context, ns string, opts ...interface{}) (<-chan pstore.PeerInfo, error) { - var E func(error) - - for _, opt := range opts { - switch o := opt.(type) { - case func(error): - E = o - - default: - return nil, fmt.Errorf("Unexpected option: %v", opt) - } - } - +func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) { s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) if err != nil { return nil, err } ch := make(chan pstore.PeerInfo) - go cli.doDiscover(ctx, ns, E, s, ch) + go discoverAsync(ctx, ns, s, ch) return ch, nil } -func (cli *client) doDiscover(ctx context.Context, ns string, E func(error), s inet.Stream, ch chan pstore.PeerInfo) { +func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore.PeerInfo) { defer s.Close() defer close(ch) + r := ggio.NewDelimitedReader(s, 1<<20) + w := ggio.NewDelimitedWriter(s) + const batch = 100 var ( @@ -210,13 +181,11 @@ func (cli *client) doDiscover(ctx context.Context, ns string, E func(error), s i pi []pstore.PeerInfo err error ) + for { - pi, cookie, err = cli.discoverOnce(ctx, ns, batch, cookie, s) + pi, cookie, err = discoverQuery(ns, batch, cookie, r, w) if err != nil { log.Errorf("Error in discovery [%s]: %s", ns, err.Error()) - if E != nil { - go E(err) - } return } @@ -230,7 +199,7 @@ func (cli *client) doDiscover(ctx context.Context, ns string, E func(error), s i if len(pi) < batch { select { - case <-time.After(1 * time.Minute): + case <-time.After(2 * time.Minute): case <-ctx.Done(): return }