288 lines
6.4 KiB
Go
Raw Normal View History

2018-04-19 20:18:42 +03:00
package rendezvous
import (
"context"
"fmt"
"math/rand"
2018-04-19 20:18:42 +03:00
"time"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
)
2018-04-24 18:43:43 +03:00
var (
DiscoverAsyncInterval = 2 * time.Minute
)
2018-04-19 20:18:42 +03:00
type Rendezvous interface {
2018-04-20 13:07:01 +03:00
Register(ctx context.Context, ns string, ttl int) error
2018-04-19 20:18:42 +03:00
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error)
}
type Registration struct {
Peer pstore.PeerInfo
Ns string
Ttl int
2018-04-19 20:18:42 +03:00
}
func NewRendezvousClient(host host.Host, rp peer.ID) Rendezvous {
return &client{
host: host,
rp: rp,
}
}
type client struct {
host host.Host
rp peer.ID
}
2018-04-20 13:07:01 +03:00
func (cli *client) Register(ctx context.Context, ns string, ttl int) error {
2018-04-19 20:18:42 +03:00
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
return err
}
defer s.Close()
2018-04-23 11:54:25 +03:00
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
2018-04-19 20:18:42 +03:00
w := ggio.NewDelimitedWriter(s)
req := newRegisterMessage(ns, pstore.PeerInfo{ID: cli.host.ID(), Addrs: cli.host.Addrs()}, ttl)
2018-04-20 13:07:01 +03:00
err = w.WriteMsg(req)
2018-04-19 20:18:42 +03:00
if err != nil {
return err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return err
}
if res.GetType() != pb.Message_REGISTER_RESPONSE {
return fmt.Errorf("Unexpected response: %s", res.GetType().String())
}
status := res.GetRegisterResponse().GetStatus()
if status != pb.Message_OK {
return RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()}
2018-04-19 20:18:42 +03:00
}
return nil
}
2018-04-23 14:50:46 +03:00
func Register(ctx context.Context, rz Rendezvous, ns string, ttl int) error {
if ttl < 120 {
return fmt.Errorf("registration TTL is too short")
}
err := rz.Register(ctx, ns, ttl)
2018-04-19 22:29:52 +03:00
if err != nil {
return err
}
2018-04-23 14:50:46 +03:00
go registerRefresh(ctx, rz, ns, ttl)
2018-04-19 22:29:52 +03:00
return nil
}
2018-04-23 14:50:46 +03:00
func registerRefresh(ctx context.Context, rz Rendezvous, ns string, ttl int) {
var refresh time.Duration
errcount := 0
2018-04-19 22:29:52 +03:00
for {
if errcount > 0 {
// do randomized exponential backoff, up to ~4 hours
if errcount > 7 {
errcount = 7
}
backoff := 2 << uint(errcount)
refresh = 5*time.Minute + time.Duration(rand.Intn(backoff*60000))*time.Millisecond
} else {
refresh = time.Duration(ttl-30) * time.Second
}
2018-04-19 22:29:52 +03:00
select {
2018-04-23 14:50:46 +03:00
case <-time.After(refresh):
2018-04-19 22:29:52 +03:00
case <-ctx.Done():
return
}
2018-04-20 13:07:01 +03:00
2018-04-23 14:50:46 +03:00
err := rz.Register(ctx, ns, ttl)
2018-04-20 13:07:01 +03:00
if err != nil {
log.Errorf("Error registering [%s]: %s", ns, err.Error())
errcount++
} else {
errcount = 0
2018-04-20 13:07:01 +03:00
}
2018-04-19 22:29:52 +03:00
}
}
2018-04-19 20:18:42 +03:00
func (cli *client) Unregister(ctx context.Context, ns string) error {
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
return err
}
defer s.Close()
w := ggio.NewDelimitedWriter(s)
req := newUnregisterMessage(ns, cli.host.ID())
return w.WriteMsg(req)
}
func (cli *client) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) {
2018-04-19 20:18:42 +03:00
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
2018-04-19 22:29:52 +03:00
return nil, nil, err
2018-04-19 20:18:42 +03:00
}
defer s.Close()
2018-04-23 11:54:25 +03:00
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
2018-04-19 20:18:42 +03:00
w := ggio.NewDelimitedWriter(s)
2018-04-20 13:07:01 +03:00
return discoverQuery(ns, limit, cookie, r, w)
}
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) {
2018-04-20 13:07:01 +03:00
2018-04-19 22:29:52 +03:00
req := newDiscoverMessage(ns, limit, cookie)
err := w.WriteMsg(req)
2018-04-19 20:18:42 +03:00
if err != nil {
2018-04-19 22:29:52 +03:00
return nil, nil, err
2018-04-19 20:18:42 +03:00
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
2018-04-19 22:29:52 +03:00
return nil, nil, err
2018-04-19 20:18:42 +03:00
}
if res.GetType() != pb.Message_DISCOVER_RESPONSE {
2018-04-19 22:29:52 +03:00
return nil, nil, fmt.Errorf("Unexpected response: %s", res.GetType().String())
2018-04-19 20:18:42 +03:00
}
status := res.GetDiscoverResponse().GetStatus()
if status != pb.Message_OK {
return nil, nil, RendezvousError{Status: status, Text: res.GetDiscoverResponse().GetStatusText()}
}
2018-04-19 20:18:42 +03:00
regs := res.GetDiscoverResponse().GetRegistrations()
result := make([]Registration, 0, len(regs))
2018-04-19 20:18:42 +03:00
for _, reg := range regs {
pi, err := pbToPeerInfo(reg.GetPeer())
if err != nil {
log.Errorf("Invalid peer info: %s", err.Error())
continue
}
result = append(result, Registration{Peer: pi, Ns: reg.GetNs(), Ttl: int(reg.GetTtl())})
2018-04-19 20:18:42 +03:00
}
return result, res.GetDiscoverResponse().GetCookie(), nil
2018-04-19 20:18:42 +03:00
}
func (cli *client) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) {
2018-04-19 20:18:42 +03:00
s, err := cli.host.NewStream(ctx, cli.rp, RendezvousProto)
if err != nil {
return nil, err
}
ch := make(chan Registration)
2018-04-20 13:07:01 +03:00
go discoverAsync(ctx, ns, s, ch)
2018-04-19 20:18:42 +03:00
return ch, nil
}
func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Registration) {
2018-04-19 20:18:42 +03:00
defer s.Close()
defer close(ch)
2018-04-23 11:54:25 +03:00
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
2018-04-20 13:07:01 +03:00
w := ggio.NewDelimitedWriter(s)
const batch = 200
2018-04-19 20:18:42 +03:00
2018-04-19 22:29:52 +03:00
var (
cookie []byte
regs []Registration
2018-04-19 22:29:52 +03:00
err error
)
2018-04-20 13:07:01 +03:00
2018-04-19 20:18:42 +03:00
for {
regs, cookie, err = discoverQuery(ns, batch, cookie, r, w)
2018-04-19 20:18:42 +03:00
if err != nil {
// TODO robust error recovery
// - handle closed streams with backoff + new stream, preserving the cookie
// - handle E_INVALID_COOKIE errors in that case to restart the discovery
2018-04-20 12:01:35 +03:00
log.Errorf("Error in discovery [%s]: %s", ns, err.Error())
2018-04-19 20:18:42 +03:00
return
}
for _, reg := range regs {
2018-04-19 20:18:42 +03:00
select {
case ch <- reg:
2018-04-19 20:18:42 +03:00
case <-ctx.Done():
return
}
}
if len(regs) < batch {
// TODO adaptive backoff for heavily loaded rendezvous points
2018-04-19 20:18:42 +03:00
select {
2018-04-24 18:43:43 +03:00
case <-time.After(DiscoverAsyncInterval):
2018-04-19 20:18:42 +03:00
case <-ctx.Done():
return
}
}
}
}
func DiscoverPeers(ctx context.Context, rz Rendezvous, ns string, limit int, cookie []byte) ([]pstore.PeerInfo, []byte, error) {
regs, cookie, err := rz.Discover(ctx, ns, limit, cookie)
if err != nil {
return nil, nil, err
}
pinfos := make([]pstore.PeerInfo, len(regs))
for i, reg := range regs {
pinfos[i] = reg.Peer
}
return pinfos, cookie, nil
}
func DiscoverPeersAsync(ctx context.Context, rz Rendezvous, ns string) (<-chan pstore.PeerInfo, error) {
rch, err := rz.DiscoverAsync(ctx, ns)
if err != nil {
return nil, err
}
ch := make(chan pstore.PeerInfo)
go discoverPeersAsync(ctx, rch, ch)
return ch, nil
}
func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan pstore.PeerInfo) {
defer close(ch)
for {
select {
case reg, ok := <-rch:
if !ok {
return
}
select {
case ch <- reg.Peer:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}