two interfaces for client-side: RendezvousPoint and RendezvousClient

RendezvousClient soon to be stateful.
This commit is contained in:
vyzo 2018-04-28 12:05:21 +03:00
parent baf1e4e618
commit c540724f1f
3 changed files with 66 additions and 35 deletions

View File

@ -19,7 +19,7 @@ var (
DiscoverAsyncInterval = 2 * time.Minute DiscoverAsyncInterval = 2 * time.Minute
) )
type Rendezvous interface { type RendezvousPoint interface {
Register(ctx context.Context, ns string, ttl int) error Register(ctx context.Context, ns string, ttl int) error
Unregister(ctx context.Context, ns string) error Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error)
@ -32,20 +32,39 @@ type Registration struct {
Ttl int Ttl int
} }
func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous { type RendezvousClient interface {
return &client{ Register(ctx context.Context, ns string, ttl int) error
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error)
}
func NewRendezvousPoint(host host.Host, p peer.ID) RendezvousPoint {
return &rendezvousPoint{
host: host, host: host,
rp: rp, p: p,
} }
} }
type client struct { type rendezvousPoint struct {
host host.Host host host.Host
rp peer.ID p peer.ID
} }
func (cli *client) Register(ctx context.Context, ns string, ttl int) error { func NewRendezvousClient(host host.Host, rp peer.ID) RendezvousClient {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp))
}
func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient {
return &rendezvousClient{rp: rp}
}
type rendezvousClient struct {
rp RendezvousPoint
}
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) error {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil { if err != nil {
return err return err
} }
@ -54,7 +73,7 @@ func (cli *client) Register(ctx context.Context, ns string, ttl int) error {
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s) w := ggio.NewDelimitedWriter(s)
req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl) req := newRegisterMessage(ns, pstore.PeerInfo{ID: rp.host.ID(), Addrs: rp.host.Addrs()}, ttl)
err = w.WriteMsg(req) err = w.WriteMsg(req)
if err != nil { if err != nil {
return err return err
@ -78,21 +97,21 @@ func (cli *client) Register(ctx context.Context, ns string, ttl int) error {
return nil return nil
} }
func Register(ctx context.Context, rz Rendezvous, ns string, ttl int) error { func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) error {
if ttl < 120 { if ttl < 120 {
return fmt.Errorf("registration TTL is too short") return fmt.Errorf("registration TTL is too short")
} }
err := rz.Register(ctx, ns, ttl) err := rc.rp.Register(ctx, ns, ttl)
if err != nil { if err != nil {
return err return err
} }
go registerRefresh(ctx, rz, ns, ttl) go registerRefresh(ctx, rc.rp, ns, ttl)
return nil return nil
} }
func registerRefresh(ctx context.Context, rz Rendezvous, ns string, ttl int) { func registerRefresh(ctx context.Context, rz RendezvousPoint, ns string, ttl int) {
var refresh time.Duration var refresh time.Duration
errcount := 0 errcount := 0
@ -124,20 +143,24 @@ func registerRefresh(ctx context.Context, rz Rendezvous, ns string, ttl int) {
} }
} }
func (cli *client) Unregister(ctx context.Context, ns string) error { func (rp *rendezvousPoint) Unregister(ctx context.Context, ns string) error {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil { if err != nil {
return err return err
} }
defer s.Close() defer s.Close()
w := ggio.NewDelimitedWriter(s) w := ggio.NewDelimitedWriter(s)
req := newUnregisterMessage(ns, cli.host.ID()) req := newUnregisterMessage(ns, rp.host.ID())
return w.WriteMsg(req) return w.WriteMsg(req)
} }
func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) { func (rc *rendezvousClient) Unregister(ctx context.Context, ns string) error {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) return rc.rp.Unregister(ctx, ns)
}
func (rp *rendezvousPoint) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -186,8 +209,8 @@ func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Wr
return result, res.GetDiscoverResponse().GetCookie(), nil return result, res.GetDiscoverResponse().GetCookie(), nil
} }
func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) { func (rp *rendezvousPoint) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto) s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -241,8 +264,8 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Regist
} }
} }
func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) { func (rc *rendezvousClient) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) {
regs, cookie, err := rz.Discover(ctx, ns, limit, cookie) regs, cookie, err := rc.rp.Discover(ctx, ns, limit, cookie)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -255,8 +278,8 @@ func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, coo
return pinfos, cookie, nil return pinfos, cookie, nil
} }
func DiscoverPeersAsync(ctx context.Context, rz Rendezvous, ns string) (<-chan pstore.PeerInfo, error) { func (rc *rendezvousClient) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) {
rch, err := rz.DiscoverAsync(ctx, ns) rch, err := rc.rp.DiscoverAsync(ctx, ns)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -9,6 +9,14 @@ import (
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
) )
func getRendezvousClients(t *testing.T, hosts []host.Host) []RendezvousClient {
clients := make([]RendezvousClient, len(hosts)-1)
for i, host := range hosts[1:] {
clients[i] = NewRendezvousClient(host, hosts[0].ID())
}
return clients
}
func TestClientRegistrationAndDiscovery(t *testing.T) { func TestClientRegistrationAndDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -23,12 +31,12 @@ func TestClientRegistrationAndDiscovery(t *testing.T) {
clients := getRendezvousClients(t, hosts) clients := getRendezvousClients(t, hosts)
err = Register(ctx, clients[0], "foo1", DefaultTTL) err = clients[0].Register(ctx, "foo1", DefaultTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pi, cookie, err := DiscoverPeers(ctx, clients[0], "foo1", 0, nil) pi, cookie, err := clients[0].Discover(ctx, "foo1", 0, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -38,12 +46,12 @@ func TestClientRegistrationAndDiscovery(t *testing.T) {
checkPeerInfo(t, pi[0], hosts[1]) checkPeerInfo(t, pi[0], hosts[1])
for i, client := range clients[1:] { for i, client := range clients[1:] {
err = Register(ctx, client, "foo1", DefaultTTL) err = client.Register(ctx, "foo1", DefaultTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
pi, cookie, err = DiscoverPeers(ctx, clients[0], "foo1", 10, cookie) pi, cookie, err = clients[0].Discover(ctx, "foo1", 10, cookie)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -54,7 +62,7 @@ func TestClientRegistrationAndDiscovery(t *testing.T) {
} }
for _, client := range clients[1:] { for _, client := range clients[1:] {
pi, _, err = DiscoverPeers(ctx, client, "foo1", 10, nil) pi, _, err = client.Discover(ctx, "foo1", 10, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -84,13 +92,13 @@ func TestClientRegistrationAndDiscoveryAsync(t *testing.T) {
DiscoverAsyncInterval = 1 * time.Second DiscoverAsyncInterval = 1 * time.Second
ch, err := DiscoverPeersAsync(ctx, clients[0], "foo1") ch, err := clients[0].DiscoverAsync(ctx, "foo1")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
for i, client := range clients[0:] { for i, client := range clients[0:] {
err = Register(ctx, client, "foo1", DefaultTTL) err = client.Register(ctx, "foo1", DefaultTTL)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -46,10 +46,10 @@ func connect(t *testing.T, a, b host.Host) {
} }
} }
func getRendezvousClients(t *testing.T, hosts []host.Host) []Rendezvous { func getRendezvousPoints(t *testing.T, hosts []host.Host) []RendezvousPoint {
clients := make([]Rendezvous, len(hosts)-1) clients := make([]RendezvousPoint, len(hosts)-1)
for i, host := range hosts[1:] { for i, host := range hosts[1:] {
clients[i] = NewRendezvousClient(host, hosts[0].ID()) clients[i] = NewRendezvousPoint(host, hosts[0].ID())
} }
return clients return clients
} }
@ -75,7 +75,7 @@ func TestSVCRegistrationAndDiscovery(t *testing.T) {
} }
defer svc.DB.Close() defer svc.DB.Close()
clients := getRendezvousClients(t, hosts) clients := getRendezvousPoints(t, hosts)
err = clients[0].Register(ctx, "foo1", 60) err = clients[0].Register(ctx, "foo1", 60)
if err != nil { if err != nil {