From c540724f1f9f5cb838b6d050b7fb0c5bcbdd4547 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 28 Apr 2018 12:05:21 +0300 Subject: [PATCH] two interfaces for client-side: RendezvousPoint and RendezvousClient RendezvousClient soon to be stateful. --- client.go | 71 +++++++++++++++++++++++++++++++++----------------- client_test.go | 22 +++++++++++----- svc_test.go | 8 +++--- 3 files changed, 66 insertions(+), 35 deletions(-) diff --git a/client.go b/client.go index bf2daa3..4ed8b87 100644 --- a/client.go +++ b/client.go @@ -19,7 +19,7 @@ var ( DiscoverAsyncInterval = 2 * time.Minute ) -type Rendezvous interface { +type RendezvousPoint interface { Register(ctx context.Context, ns string, ttl int) error Unregister(ctx context.Context, ns string) error Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) @@ -32,20 +32,39 @@ type Registration struct { Ttl int } -func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous { - return &client{ +type RendezvousClient interface { + Register(ctx context.Context, ns string, ttl int) error + Unregister(ctx context.Context, ns string) error + Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) + DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) +} + +func NewRendezvousPoint(host host.Host, p peer.ID) RendezvousPoint { + return &rendezvousPoint{ host: host, - rp: rp, + p: p, } } -type client struct { +type rendezvousPoint struct { host host.Host - rp peer.ID + p peer.ID } -func (cli *client) Register(ctx context.Context, ns string, ttl int) error { - s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) +func NewRendezvousClient(host host.Host, rp peer.ID) RendezvousClient { + return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp)) +} + +func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient { + return &rendezvousClient{rp: rp} +} + +type rendezvousClient struct { + rp RendezvousPoint +} + +func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) error { + s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) if err != nil { return err } @@ -54,7 +73,7 @@ func (cli *client) Register(ctx context.Context, ns string, ttl int) error { r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) w := ggio.NewDelimitedWriter(s) - req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl) + req := newRegisterMessage(ns, pstore.PeerInfo{ID: rp.host.ID(), Addrs: rp.host.Addrs()}, ttl) err = w.WriteMsg(req) if err != nil { return err @@ -78,21 +97,21 @@ func (cli *client) Register(ctx context.Context, ns string, ttl int) error { return nil } -func Register(ctx context.Context, rz Rendezvous, ns string, ttl int) error { +func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) error { if ttl < 120 { return fmt.Errorf("registration TTL is too short") } - err := rz.Register(ctx, ns, ttl) + err := rc.rp.Register(ctx, ns, ttl) if err != nil { return err } - go registerRefresh(ctx, rz, ns, ttl) + go registerRefresh(ctx, rc.rp, ns, ttl) return nil } -func registerRefresh(ctx context.Context, rz Rendezvous, ns string, ttl int) { +func registerRefresh(ctx context.Context, rz RendezvousPoint, ns string, ttl int) { var refresh time.Duration errcount := 0 @@ -124,20 +143,24 @@ func registerRefresh(ctx context.Context, rz Rendezvous, ns string, ttl int) { } } -func (cli *client) Unregister(ctx context.Context, ns string) error { - s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) +func (rp *rendezvousPoint) Unregister(ctx context.Context, ns string) error { + s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) if err != nil { return err } defer s.Close() w := ggio.NewDelimitedWriter(s) - req := newUnregisterMessage(ns, cli.host.ID()) + req := newUnregisterMessage(ns, rp.host.ID()) return w.WriteMsg(req) } -func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) { - s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) +func (rc *rendezvousClient) Unregister(ctx context.Context, ns string) error { + return rc.rp.Unregister(ctx, ns) +} + +func (rp *rendezvousPoint) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) { + s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) if err != nil { return nil, nil, err } @@ -186,8 +209,8 @@ func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Wr return result, res.GetDiscoverResponse().GetCookie(), nil } -func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) { - s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) +func (rp *rendezvousPoint) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) { + s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto) if err != nil { return nil, err } @@ -241,8 +264,8 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Regist } } -func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { - regs, cookie, err := rz.Discover(ctx, ns, limit, cookie) +func (rc *rendezvousClient) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { + regs, cookie, err := rc.rp.Discover(ctx, ns, limit, cookie) if err != nil { return nil, nil, err } @@ -255,8 +278,8 @@ func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, coo return pinfos, cookie, nil } -func DiscoverPeersAsync(ctx context.Context, rz Rendezvous, ns string) (<-chan pstore.PeerInfo, error) { - rch, err := rz.DiscoverAsync(ctx, ns) +func (rc *rendezvousClient) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) { + rch, err := rc.rp.DiscoverAsync(ctx, ns) if err != nil { return nil, err } diff --git a/client_test.go b/client_test.go index 68d1b29..0c33200 100644 --- a/client_test.go +++ b/client_test.go @@ -9,6 +9,14 @@ import ( pstore "github.com/libp2p/go-libp2p-peerstore" ) +func getRendezvousClients(t *testing.T, hosts []host.Host) []RendezvousClient { + clients := make([]RendezvousClient, len(hosts)-1) + for i, host := range hosts[1:] { + clients[i] = NewRendezvousClient(host, hosts[0].ID()) + } + return clients +} + func TestClientRegistrationAndDiscovery(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -23,12 +31,12 @@ func TestClientRegistrationAndDiscovery(t *testing.T) { clients := getRendezvousClients(t, hosts) - err = Register(ctx, clients[0], "foo1", DefaultTTL) + err = clients[0].Register(ctx, "foo1", DefaultTTL) if err != nil { t.Fatal(err) } - pi, cookie, err := DiscoverPeers(ctx, clients[0], "foo1", 0, nil) + pi, cookie, err := clients[0].Discover(ctx, "foo1", 0, nil) if err != nil { t.Fatal(err) } @@ -38,12 +46,12 @@ func TestClientRegistrationAndDiscovery(t *testing.T) { checkPeerInfo(t, pi[0], hosts[1]) for i, client := range clients[1:] { - err = Register(ctx, client, "foo1", DefaultTTL) + err = client.Register(ctx, "foo1", DefaultTTL) if err != nil { t.Fatal(err) } - pi, cookie, err = DiscoverPeers(ctx, clients[0], "foo1", 10, cookie) + pi, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie) if err != nil { t.Fatal(err) } @@ -54,7 +62,7 @@ func TestClientRegistrationAndDiscovery(t *testing.T) { } for _, client := range clients[1:] { - pi, _, err = DiscoverPeers(ctx, client, "foo1", 10, nil) + pi, _, err = client.Discover(ctx, "foo1", 10, nil) if err != nil { t.Fatal(err) } @@ -84,13 +92,13 @@ func TestClientRegistrationAndDiscoveryAsync(t *testing.T) { DiscoverAsyncInterval = 1 * time.Second - ch, err := DiscoverPeersAsync(ctx, clients[0], "foo1") + ch, err := clients[0].DiscoverAsync(ctx, "foo1") if err != nil { t.Fatal(err) } for i, client := range clients[0:] { - err = Register(ctx, client, "foo1", DefaultTTL) + err = client.Register(ctx, "foo1", DefaultTTL) if err != nil { t.Fatal(err) } diff --git a/svc_test.go b/svc_test.go index 71e3f35..7a7ed5c 100644 --- a/svc_test.go +++ b/svc_test.go @@ -46,10 +46,10 @@ func connect(t *testing.T, a, b host.Host) { } } -func getRendezvousClients(t *testing.T, hosts []host.Host) []Rendezvous { - clients := make([]Rendezvous, len(hosts)-1) +func getRendezvousPoints(t *testing.T, hosts []host.Host) []RendezvousPoint { + clients := make([]RendezvousPoint, len(hosts)-1) for i, host := range hosts[1:] { - clients[i] = NewRendezvousClient(host, hosts[0].ID()) + clients[i] = NewRendezvousPoint(host, hosts[0].ID()) } return clients } @@ -75,7 +75,7 @@ func TestSVCRegistrationAndDiscovery(t *testing.T) { } defer svc.DB.Close() - clients := getRendezvousClients(t, hosts) + clients := getRendezvousPoints(t, hosts) err = clients[0].Register(ctx, "foo1", 60) if err != nil {