419 lines
11 KiB
Go
Raw Normal View History

2023-03-27 09:51:55 -04:00
package rendezvous
import (
"context"
"fmt"
"math/rand"
"time"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
pb "github.com/berty/go-libp2p-rendezvous/pb"
)
var (
DiscoverAsyncInterval = 2 * time.Minute
)
type RendezvousPoint interface {
Register(ctx context.Context, ns string, ttl int) (time.Duration, error)
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error)
DiscoverSubscribe(ctx context.Context, ns string, serviceTypes []RendezvousSyncClient) (<-chan peer.AddrInfo, error)
}
type Registration struct {
Peer peer.AddrInfo
Ns string
Ttl int
}
type RendezvousClient interface {
Register(ctx context.Context, ns string, ttl int) (time.Duration, error)
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
DiscoverSubscribe(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
}
func NewRendezvousPoint(host host.Host, p peer.ID, opts ...RendezvousPointOption) RendezvousPoint {
cfg := defaultRendezvousPointConfig
cfg.apply(opts...)
return &rendezvousPoint{
addrFactory: cfg.AddrsFactory,
host: host,
p: p,
}
}
type rendezvousPoint struct {
addrFactory AddrsFactory
host host.Host
p peer.ID
}
func NewRendezvousClient(host host.Host, rp peer.ID, sync ...RendezvousSyncClient) RendezvousClient {
return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp), sync...)
}
func NewRendezvousClientWithPoint(rp RendezvousPoint, syncClientList ...RendezvousSyncClient) RendezvousClient {
return &rendezvousClient{rp: rp, syncClients: syncClientList}
}
type rendezvousClient struct {
rp RendezvousPoint
syncClients []RendezvousSyncClient
}
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return 0, err
}
defer s.Reset()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
addrs := rp.addrFactory(rp.host.Addrs())
if len(addrs) == 0 {
return 0, fmt.Errorf("no addrs available to advertise: %s", ns)
}
log.Debugf("advertising on `%s` with: %v", ns, addrs)
req := newRegisterMessage(ns, peer.AddrInfo{ID: rp.host.ID(), Addrs: addrs}, ttl)
err = w.WriteMsg(req)
if err != nil {
return 0, err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return 0, err
}
if res.GetType() != pb.Message_REGISTER_RESPONSE {
return 0, fmt.Errorf("unexpected response: %s", res.GetType().String())
}
response := res.GetRegisterResponse()
status := response.GetStatus()
if status != pb.Message_OK {
return 0, RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()}
}
return time.Duration(response.Ttl) * time.Second, nil
}
func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
if ttl < 120 {
return 0, fmt.Errorf("registration TTL is too short")
}
returnedTTL, err := rc.rp.Register(ctx, ns, ttl)
if err != nil {
return 0, err
}
go registerRefresh(ctx, rc.rp, ns, ttl)
return returnedTTL, nil
}
func registerRefresh(ctx context.Context, rz RendezvousPoint, ns string, ttl int) {
var refresh time.Duration
errcount := 0
for {
if errcount > 0 {
// do randomized exponential backoff, up to ~4 hours
if errcount > 7 {
errcount = 7
}
backoff := 2 << uint(errcount)
refresh = 5*time.Minute + time.Duration(rand.Intn(backoff*60000))*time.Millisecond
} else {
refresh = time.Duration(ttl-30) * time.Second
}
select {
case <-time.After(refresh):
case <-ctx.Done():
return
}
_, err := rz.Register(ctx, ns, ttl)
if err != nil {
log.Errorf("Error registering [%s]: %s", ns, err.Error())
errcount++
} else {
errcount = 0
}
}
}
func (rp *rendezvousPoint) Unregister(ctx context.Context, ns string) error {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return err
}
defer s.Close()
w := ggio.NewDelimitedWriter(s)
req := newUnregisterMessage(ns, rp.host.ID())
return w.WriteMsg(req)
}
func (rc *rendezvousClient) Unregister(ctx context.Context, ns string) error {
return rc.rp.Unregister(ctx, ns)
}
func (rp *rendezvousPoint) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return nil, nil, err
}
defer s.Reset()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
return discoverQuery(ns, limit, cookie, r, w)
}
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) {
req := newDiscoverMessage(ns, limit, cookie)
err := w.WriteMsg(req)
if err != nil {
return nil, nil, err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return nil, nil, err
}
if res.GetType() != pb.Message_DISCOVER_RESPONSE {
return nil, nil, fmt.Errorf("Unexpected response: %s", res.GetType().String())
}
status := res.GetDiscoverResponse().GetStatus()
if status != pb.Message_OK {
return nil, nil, RendezvousError{Status: status, Text: res.GetDiscoverResponse().GetStatusText()}
}
regs := res.GetDiscoverResponse().GetRegistrations()
result := make([]Registration, 0, len(regs))
for _, reg := range regs {
pi, err := pbToPeerInfo(reg.GetPeer())
if err != nil {
log.Errorf("Invalid peer info: %s", err.Error())
continue
}
result = append(result, Registration{Peer: pi, Ns: reg.GetNs(), Ttl: int(reg.GetTtl())})
}
return result, res.GetDiscoverResponse().GetCookie(), nil
}
func (rp *rendezvousPoint) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) {
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return nil, err
}
ch := make(chan Registration)
go discoverAsync(ctx, ns, s, ch)
return ch, nil
}
func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Registration) {
defer s.Reset()
defer close(ch)
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
const batch = 200
var (
cookie []byte
regs []Registration
err error
)
for {
regs, cookie, err = discoverQuery(ns, batch, cookie, r, w)
if err != nil {
// TODO robust error recovery
// - handle closed streams with backoff + new stream, preserving the cookie
// - handle E_INVALID_COOKIE errors in that case to restart the discovery
log.Errorf("Error in discovery [%s]: %s", ns, err.Error())
return
}
for _, reg := range regs {
select {
case ch <- reg:
case <-ctx.Done():
return
}
}
if len(regs) < batch {
// TODO adaptive backoff for heavily loaded rendezvous points
select {
case <-time.After(DiscoverAsyncInterval):
case <-ctx.Done():
return
}
}
}
}
func (rc *rendezvousClient) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error) {
regs, cookie, err := rc.rp.Discover(ctx, ns, limit, cookie)
if err != nil {
return nil, nil, err
}
pinfos := make([]peer.AddrInfo, len(regs))
for i, reg := range regs {
pinfos[i] = reg.Peer
}
return pinfos, cookie, nil
}
func (rc *rendezvousClient) DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) {
rch, err := rc.rp.DiscoverAsync(ctx, ns)
if err != nil {
return nil, err
}
ch := make(chan peer.AddrInfo)
go discoverPeersAsync(ctx, rch, ch)
return ch, nil
}
func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan peer.AddrInfo) {
defer close(ch)
for {
select {
case reg, ok := <-rch:
if !ok {
return
}
select {
case ch <- reg.Peer:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
func (rc *rendezvousClient) DiscoverSubscribe(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) {
return rc.rp.DiscoverSubscribe(ctx, ns, rc.syncClients)
}
func subscribeServiceTypes(serviceTypeClients []RendezvousSyncClient) []string {
serviceTypes := []string(nil)
for _, serviceType := range serviceTypeClients {
serviceTypes = append(serviceTypes, serviceType.GetServiceType())
}
return serviceTypes
}
func (rp *rendezvousPoint) DiscoverSubscribe(ctx context.Context, ns string, serviceTypeClients []RendezvousSyncClient) (<-chan peer.AddrInfo, error) {
serviceTypes := subscribeServiceTypes(serviceTypeClients)
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
subType, subDetails, err := discoverSubscribeQuery(ns, serviceTypes, r, w)
if err != nil {
return nil, fmt.Errorf("discover subscribe error: %w", err)
}
subClient := RendezvousSyncClient(nil)
for _, subClient = range serviceTypeClients {
if subClient.GetServiceType() == subType {
break
}
}
if subClient == nil {
return nil, fmt.Errorf("unrecognized client type")
}
regCh, err := subClient.Subscribe(ctx, subDetails)
if err != nil {
return nil, fmt.Errorf("unable to subscribe to updates: %w", err)
}
ch := make(chan peer.AddrInfo)
go func() {
defer close(ch)
for {
select {
case result, ok := <-regCh:
if !ok {
return
}
ch <- result.Peer
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
func discoverSubscribeQuery(ns string, serviceTypes []string, r ggio.Reader, w ggio.Writer) (subType string, subDetails string, err error) {
req := &pb.Message{
Type: pb.Message_DISCOVER_SUBSCRIBE,
DiscoverSubscribe: newDiscoverSubscribeMessage(ns, serviceTypes),
}
err = w.WriteMsg(req)
if err != nil {
return "", "", fmt.Errorf("write err: %w", err)
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return "", "", fmt.Errorf("read err: %w", err)
}
if res.GetType() != pb.Message_DISCOVER_SUBSCRIBE_RESPONSE {
return "", "", fmt.Errorf("unexpected response: %s", res.GetType().String())
}
status := res.GetDiscoverSubscribeResponse().GetStatus()
if status != pb.Message_OK {
return "", "", RendezvousError{Status: status, Text: res.GetDiscoverSubscribeResponse().GetStatusText()}
}
subType = res.GetDiscoverSubscribeResponse().GetSubscriptionType()
subDetails = res.GetDiscoverSubscribeResponse().GetSubscriptionDetails()
return subType, subDetails, nil
}