mirror of
https://github.com/logos-messaging/go-waku-rendezvous.git
synced 2026-01-02 13:13:08 +00:00
simplify Rendezvous interface
This commit is contained in:
parent
e3d343f34a
commit
1506c04f1a
95
client.go
95
client.go
@ -17,12 +17,13 @@ import (
|
||||
|
||||
var log = logging.Logger("rendezvous")
|
||||
|
||||
const DefaultTTL = 2 * 3600 // 2hr
|
||||
|
||||
type Rendezvous interface {
|
||||
RegisterOnce(ctx context.Context, ns string, ttl int) error
|
||||
Register(ctx context.Context, ns string, opts ...interface{}) error
|
||||
Register(ctx context.Context, ns string, ttl int) error
|
||||
Unregister(ctx context.Context, ns string) error
|
||||
DiscoverOnce(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error)
|
||||
Discover(ctx context.Context, ns string, opts ...interface{}) (<-chan pstore.PeerInfo, error)
|
||||
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error)
|
||||
DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error)
|
||||
}
|
||||
|
||||
func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous {
|
||||
@ -37,22 +38,18 @@ type client struct {
|
||||
rp peer.ID
|
||||
}
|
||||
|
||||
func (cli *client) RegisterOnce(ctx context.Context, ns string, ttl int) error {
|
||||
func (cli *client) Register(ctx context.Context, ns string, ttl int) error {
|
||||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
return cli.registerOnce(ctx, ns, ttl, s)
|
||||
}
|
||||
|
||||
func (cli *client) registerOnce(ctx context.Context, ns string, ttl int, s inet.Stream) error {
|
||||
r := ggio.NewDelimitedReader(s, 1<<20)
|
||||
w := ggio.NewDelimitedWriter(s)
|
||||
|
||||
req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl)
|
||||
err := w.WriteMsg(req)
|
||||
err = w.WriteMsg(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -75,48 +72,30 @@ func (cli *client) registerOnce(ctx context.Context, ns string, ttl int, s inet.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *client) Register(ctx context.Context, ns string, opts ...interface{}) error {
|
||||
var E func(error)
|
||||
|
||||
for _, opt := range opts {
|
||||
switch o := opt.(type) {
|
||||
case func(error):
|
||||
E = o
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unexpected option: %v", opt)
|
||||
}
|
||||
}
|
||||
|
||||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
|
||||
func Register(ctx context.Context, rz Rendezvous, ns string) error {
|
||||
err := rz.Register(ctx, ns, DefaultTTL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go cli.doRegister(ctx, ns, E, s)
|
||||
go registerRefresh(ctx, rz, ns)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *client) doRegister(ctx context.Context, ns string, E func(error), s inet.Stream) {
|
||||
const ttl = 2 * 3600 // 2hr
|
||||
const refresh = ttl - 30
|
||||
func registerRefresh(ctx context.Context, rz Rendezvous, ns string) {
|
||||
const refresh = DefaultTTL - 30
|
||||
|
||||
defer s.Close()
|
||||
for {
|
||||
err := cli.registerOnce(ctx, ns, ttl, s)
|
||||
if err != nil {
|
||||
log.Errorf("Error registering [%s]: %s", ns, err.Error())
|
||||
if E != nil {
|
||||
go E(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(refresh * time.Second):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
err := rz.Register(ctx, ns, DefaultTTL)
|
||||
if err != nil {
|
||||
log.Errorf("Error registering [%s]: %s", ns, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,20 +111,21 @@ func (cli *client) Unregister(ctx context.Context, ns string) error {
|
||||
return w.WriteMsg(req)
|
||||
}
|
||||
|
||||
func (cli *client) DiscoverOnce(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) {
|
||||
func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) {
|
||||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
return cli.discoverOnce(ctx, ns, limit, cookie, s)
|
||||
}
|
||||
|
||||
func (cli *client) discoverOnce(ctx context.Context, ns string, limit int, cookie []byte, s inet.Stream) ([]pstore.PeerInfo, []byte, error) {
|
||||
r := ggio.NewDelimitedReader(s, 1<<20)
|
||||
w := ggio.NewDelimitedWriter(s)
|
||||
|
||||
return discoverQuery(ns, limit, cookie, r, w)
|
||||
}
|
||||
|
||||
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]pstore.PeerInfo, []byte, error) {
|
||||
|
||||
req := newDiscoverMessage(ns, limit, cookie)
|
||||
err := w.WriteMsg(req)
|
||||
if err != nil {
|
||||
@ -176,33 +156,24 @@ func (cli *client) discoverOnce(ctx context.Context, ns string, limit int, cooki
|
||||
return pinfos, res.GetDiscoverResponse().GetCookie(), nil
|
||||
}
|
||||
|
||||
func (cli *client) Discover(ctx context.Context, ns string, opts ...interface{}) (<-chan pstore.PeerInfo, error) {
|
||||
var E func(error)
|
||||
|
||||
for _, opt := range opts {
|
||||
switch o := opt.(type) {
|
||||
case func(error):
|
||||
E = o
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("Unexpected option: %v", opt)
|
||||
}
|
||||
}
|
||||
|
||||
func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan pstore.PeerInfo, error) {
|
||||
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ch := make(chan pstore.PeerInfo)
|
||||
go cli.doDiscover(ctx, ns, E, s, ch)
|
||||
go discoverAsync(ctx, ns, s, ch)
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (cli *client) doDiscover(ctx context.Context, ns string, E func(error), s inet.Stream, ch chan pstore.PeerInfo) {
|
||||
func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan pstore.PeerInfo) {
|
||||
defer s.Close()
|
||||
defer close(ch)
|
||||
|
||||
r := ggio.NewDelimitedReader(s, 1<<20)
|
||||
w := ggio.NewDelimitedWriter(s)
|
||||
|
||||
const batch = 100
|
||||
|
||||
var (
|
||||
@ -210,13 +181,11 @@ func (cli *client) doDiscover(ctx context.Context, ns string, E func(error), s i
|
||||
pi []pstore.PeerInfo
|
||||
err error
|
||||
)
|
||||
|
||||
for {
|
||||
pi, cookie, err = cli.discoverOnce(ctx, ns, batch, cookie, s)
|
||||
pi, cookie, err = discoverQuery(ns, batch, cookie, r, w)
|
||||
if err != nil {
|
||||
log.Errorf("Error in discovery [%s]: %s", ns, err.Error())
|
||||
if E != nil {
|
||||
go E(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -230,7 +199,7 @@ func (cli *client) doDiscover(ctx context.Context, ns string, E func(error), s i
|
||||
|
||||
if len(pi) < batch {
|
||||
select {
|
||||
case <-time.After(1 * time.Minute):
|
||||
case <-time.After(2 * time.Minute):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user