feat: support for sync clients

Signed-off-by: Guillaume Louvigny <glouvigny@users.noreply.github.com>
This commit is contained in:
Guillaume Louvigny 2022-08-02 11:01:09 +02:00
parent 09965cd647
commit 10a9ad4472
17 changed files with 2321 additions and 579 deletions

2
.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
*.pb.go linguist-generated merge=ours -diff
go.sum linguist-generated text

111
client.go
View File

@ -6,13 +6,12 @@ import (
"math/rand" "math/rand"
"time" "time"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
) )
var ( var (
@ -24,6 +23,7 @@ type RendezvousPoint interface {
Unregister(ctx context.Context, ns string) error Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error) DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error)
DiscoverSubscribe(ctx context.Context, ns string, serviceTypes []RendezvousSyncClient) (<-chan peer.AddrInfo, error)
} }
type Registration struct { type Registration struct {
@ -37,6 +37,7 @@ type RendezvousClient interface {
Unregister(ctx context.Context, ns string) error Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error) Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
DiscoverSubscribe(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
} }
func NewRendezvousPoint(host host.Host, p peer.ID) RendezvousPoint { func NewRendezvousPoint(host host.Host, p peer.ID) RendezvousPoint {
@ -51,16 +52,17 @@ type rendezvousPoint struct {
p peer.ID p peer.ID
} }
func NewRendezvousClient(host host.Host, rp peer.ID) RendezvousClient { func NewRendezvousClient(host host.Host, rp peer.ID, sync ...RendezvousSyncClient) RendezvousClient {
return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp)) return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp), sync...)
} }
func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient { func NewRendezvousClientWithPoint(rp RendezvousPoint, syncClientList ...RendezvousSyncClient) RendezvousClient {
return &rendezvousClient{rp: rp} return &rendezvousClient{rp: rp, syncClients: syncClientList}
} }
type rendezvousClient struct { type rendezvousClient struct {
rp RendezvousPoint rp RendezvousPoint
syncClients []RendezvousSyncClient
} }
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) { func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
@ -95,7 +97,7 @@ func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (ti
return 0, RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()} return 0, RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()}
} }
return time.Duration(*response.Ttl) * time.Second, nil return time.Duration(response.Ttl) * time.Second, nil
} }
func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) { func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
@ -308,3 +310,96 @@ func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan pe
} }
} }
} }
func (rc *rendezvousClient) DiscoverSubscribe(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) {
return rc.rp.DiscoverSubscribe(ctx, ns, rc.syncClients)
}
func subscribeServiceTypes(serviceTypeClients []RendezvousSyncClient) []string {
serviceTypes := []string(nil)
for _, serviceType := range serviceTypeClients {
serviceTypes = append(serviceTypes, serviceType.GetServiceType())
}
return serviceTypes
}
func (rp *rendezvousPoint) DiscoverSubscribe(ctx context.Context, ns string, serviceTypeClients []RendezvousSyncClient) (<-chan peer.AddrInfo, error) {
serviceTypes := subscribeServiceTypes(serviceTypeClients)
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
subType, subDetails, err := discoverSubscribeQuery(ns, serviceTypes, r, w)
if err != nil {
return nil, fmt.Errorf("discover subscribe error: %w", err)
}
subClient := RendezvousSyncClient(nil)
for _, subClient = range serviceTypeClients {
if subClient.GetServiceType() == subType {
break
}
}
if subClient == nil {
return nil, fmt.Errorf("unrecognized client type")
}
regCh, err := subClient.Subscribe(ctx, subDetails)
if err != nil {
return nil, fmt.Errorf("unable to subscribe to updates: %w", err)
}
ch := make(chan peer.AddrInfo)
go func() {
defer close(ch)
for {
select {
case <-ctx.Done():
return
case result := <-regCh:
ch <- result.Peer
}
}
}()
return ch, nil
}
func discoverSubscribeQuery(ns string, serviceTypes []string, r ggio.Reader, w ggio.Writer) (subType string, subDetails string, err error) {
req := &pb.Message{
Type: pb.Message_DISCOVER_SUBSCRIBE,
DiscoverSubscribe: newDiscoverSubscribeMessage(ns, serviceTypes),
}
err = w.WriteMsg(req)
if err != nil {
return "", "", fmt.Errorf("write err: %w", err)
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return "", "", fmt.Errorf("read err: %w", err)
}
if res.GetType() != pb.Message_DISCOVER_SUBSCRIBE_RESPONSE {
return "", "", fmt.Errorf("unexpected response: %s", res.GetType().String())
}
status := res.GetDiscoverSubscribeResponse().GetStatus()
if status != pb.Message_OK {
return "", "", RendezvousError{Status: status, Text: res.GetDiscoverSubscribeResponse().GetStatusText()}
}
subType = res.GetDiscoverSubscribeResponse().GetSubscriptionType()
subDetails = res.GetDiscoverSubscribeResponse().GetSubscriptionDetails()
return subType, subDetails, nil
}

View File

@ -2,11 +2,12 @@ package rendezvous
import ( import (
"context" "context"
"github.com/libp2p/go-libp2p-core/peer"
"testing" "testing"
"time" "time"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-rendezvous/test_utils"
) )
func getRendezvousClients(t *testing.T, hosts []host.Host) []RendezvousClient { func getRendezvousClients(t *testing.T, hosts []host.Host) []RendezvousClient {
@ -17,6 +18,10 @@ func getRendezvousClients(t *testing.T, hosts []host.Host) []RendezvousClient {
return clients return clients
} }
func checkPeerInfo(t *testing.T, pi peer.AddrInfo, host host.Host) bool {
return test_utils.CheckPeerInfo(t, pi, host, true)
}
func TestClientRegistrationAndDiscovery(t *testing.T) { func TestClientRegistrationAndDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -118,20 +123,3 @@ func TestClientRegistrationAndDiscoveryAsync(t *testing.T) {
DiscoverAsyncInterval = 2 * time.Minute DiscoverAsyncInterval = 2 * time.Minute
} }
func checkPeerInfo(t *testing.T, pi peer.AddrInfo, host host.Host) {
if pi.ID != host.ID() {
t.Fatal("bad registration: peer ID doesn't match host ID")
}
addrs := host.Addrs()
raddrs := pi.Addrs
if len(addrs) != len(raddrs) {
t.Fatal("bad registration: peer address length mismatch")
}
for i, addr := range addrs {
raddr := raddrs[i]
if !addr.Equal(raddr) {
t.Fatal("bad registration: peer address mismatch")
}
}
}

View File

@ -2,12 +2,13 @@ package rendezvous
import ( import (
"context" "context"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"math/rand" "math/rand"
"testing" "testing"
"time" "time"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
) )
func getRendezvousDiscovery(hosts []host.Host) []discovery.Discovery { func getRendezvousDiscovery(hosts []host.Host) []discovery.Discovery {

2
generate.go Normal file
View File

@ -0,0 +1,2 @@
//go:generate protoc --proto_path=pb/ --gofast_opt="Mrendezvous.proto=.;rendezvous_pb" --gofast_out=./pb ./pb/rendezvous.proto
package rendezvous

2
go.mod
View File

@ -4,6 +4,8 @@ go 1.15
require ( require (
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.1.1
github.com/ipfs/go-log/v2 v2.1.3 github.com/ipfs/go-log/v2 v2.1.3
github.com/libp2p/go-libp2p-blankhost v0.2.0 github.com/libp2p/go-libp2p-blankhost v0.2.0
github.com/libp2p/go-libp2p-core v0.10.0 github.com/libp2p/go-libp2p-core v0.10.0

1
go.sum generated
View File

@ -167,6 +167,7 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=

1959
pb/rendezvous.pb.go generated

File diff suppressed because it is too large Load Diff

View File

@ -9,6 +9,9 @@ message Message {
UNREGISTER = 2; UNREGISTER = 2;
DISCOVER = 3; DISCOVER = 3;
DISCOVER_RESPONSE = 4; DISCOVER_RESPONSE = 4;
DISCOVER_SUBSCRIBE = 100;
DISCOVER_SUBSCRIBE_RESPONSE = 101;
} }
enum ResponseStatus { enum ResponseStatus {
@ -57,10 +60,32 @@ message Message {
string statusText = 4; string statusText = 4;
} }
message DiscoverSubscribe {
repeated string supported_subscription_types = 1;
string ns = 2;
}
message DiscoverSubscribeResponse {
string subscription_type = 1;
string subscription_details = 2;
ResponseStatus status = 3;
string statusText = 4;
}
MessageType type = 1; MessageType type = 1;
Register register = 2; Register register = 2;
RegisterResponse registerResponse = 3; RegisterResponse registerResponse = 3;
Unregister unregister = 4; Unregister unregister = 4;
Discover discover = 5; Discover discover = 5;
DiscoverResponse discoverResponse = 6; DiscoverResponse discoverResponse = 6;
DiscoverSubscribe discoverSubscribe = 100;
DiscoverSubscribeResponse discoverSubscribeResponse = 101;
}
message RegistrationRecord{
string id = 1;
repeated bytes addrs = 2;
string ns = 3;
int64 ttl = 4;
} }

View File

@ -30,16 +30,20 @@ func (e RendezvousError) Error() string {
return fmt.Sprintf("Rendezvous error: %s (%s)", e.Text, pb.Message_ResponseStatus(e.Status).String()) return fmt.Sprintf("Rendezvous error: %s (%s)", e.Text, pb.Message_ResponseStatus(e.Status).String())
} }
func NewRegisterMessage(ns string, pi peer.AddrInfo, ttl int) *pb.Message {
return newRegisterMessage(ns, pi, ttl)
}
func newRegisterMessage(ns string, pi peer.AddrInfo, ttl int) *pb.Message { func newRegisterMessage(ns string, pi peer.AddrInfo, ttl int) *pb.Message {
msg := new(pb.Message) msg := new(pb.Message)
msg.Type = pb.Message_REGISTER.Enum() msg.Type = pb.Message_REGISTER
msg.Register = new(pb.Message_Register) msg.Register = new(pb.Message_Register)
if ns != "" { if ns != "" {
msg.Register.Ns = &ns msg.Register.Ns = ns
} }
if ttl > 0 { if ttl > 0 {
ttl64 := int64(ttl) ttl64 := int64(ttl)
msg.Register.Ttl = &ttl64 msg.Register.Ttl = ttl64
} }
msg.Register.Peer = new(pb.Message_PeerInfo) msg.Register.Peer = new(pb.Message_PeerInfo)
msg.Register.Peer.Id = []byte(pi.ID) msg.Register.Peer.Id = []byte(pi.ID)
@ -52,25 +56,29 @@ func newRegisterMessage(ns string, pi peer.AddrInfo, ttl int) *pb.Message {
func newUnregisterMessage(ns string, pid peer.ID) *pb.Message { func newUnregisterMessage(ns string, pid peer.ID) *pb.Message {
msg := new(pb.Message) msg := new(pb.Message)
msg.Type = pb.Message_UNREGISTER.Enum() msg.Type = pb.Message_UNREGISTER
msg.Unregister = new(pb.Message_Unregister) msg.Unregister = new(pb.Message_Unregister)
if ns != "" { if ns != "" {
msg.Unregister.Ns = &ns msg.Unregister.Ns = ns
} }
msg.Unregister.Id = []byte(pid) msg.Unregister.Id = []byte(pid)
return msg return msg
} }
func NewDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
return newDiscoverMessage(ns, limit, cookie)
}
func newDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message { func newDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
msg := new(pb.Message) msg := new(pb.Message)
msg.Type = pb.Message_DISCOVER.Enum() msg.Type = pb.Message_DISCOVER
msg.Discover = new(pb.Message_Discover) msg.Discover = new(pb.Message_Discover)
if ns != "" { if ns != "" {
msg.Discover.Ns = &ns msg.Discover.Ns = ns
} }
if limit > 0 { if limit > 0 {
limit64 := int64(limit) limit64 := int64(limit)
msg.Discover.Limit = &limit64 msg.Discover.Limit = limit64
} }
if cookie != nil { if cookie != nil {
msg.Discover.Cookie = cookie msg.Discover.Cookie = cookie
@ -103,32 +111,32 @@ func pbToPeerInfo(p *pb.Message_PeerInfo) (peer.AddrInfo, error) {
func newRegisterResponse(ttl int) *pb.Message_RegisterResponse { func newRegisterResponse(ttl int) *pb.Message_RegisterResponse {
ttl64 := int64(ttl) ttl64 := int64(ttl)
r := new(pb.Message_RegisterResponse) r := new(pb.Message_RegisterResponse)
r.Status = pb.Message_OK.Enum() r.Status = pb.Message_OK
r.Ttl = &ttl64 r.Ttl = ttl64
return r return r
} }
func newRegisterResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_RegisterResponse { func newRegisterResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_RegisterResponse {
r := new(pb.Message_RegisterResponse) r := new(pb.Message_RegisterResponse)
r.Status = status.Enum() r.Status = status
r.StatusText = &text r.StatusText = text
return r return r
} }
func newDiscoverResponse(regs []db.RegistrationRecord, cookie []byte) *pb.Message_DiscoverResponse { func newDiscoverResponse(regs []db.RegistrationRecord, cookie []byte) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse) r := new(pb.Message_DiscoverResponse)
r.Status = pb.Message_OK.Enum() r.Status = pb.Message_OK
rregs := make([]*pb.Message_Register, len(regs)) rregs := make([]*pb.Message_Register, len(regs))
for i, reg := range regs { for i, reg := range regs {
rreg := new(pb.Message_Register) rreg := new(pb.Message_Register)
rns := reg.Ns rns := reg.Ns
rreg.Ns = &rns rreg.Ns = rns
rreg.Peer = new(pb.Message_PeerInfo) rreg.Peer = new(pb.Message_PeerInfo)
rreg.Peer.Id = []byte(reg.Id) rreg.Peer.Id = []byte(reg.Id)
rreg.Peer.Addrs = reg.Addrs rreg.Peer.Addrs = reg.Addrs
rttl := int64(reg.Ttl) rttl := int64(reg.Ttl)
rreg.Ttl = &rttl rreg.Ttl = rttl
rregs[i] = rreg rregs[i] = rreg
} }
@ -140,7 +148,31 @@ func newDiscoverResponse(regs []db.RegistrationRecord, cookie []byte) *pb.Messag
func newDiscoverResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverResponse { func newDiscoverResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse) r := new(pb.Message_DiscoverResponse)
r.Status = status.Enum() r.Status = status
r.StatusText = &text r.StatusText = text
return r
}
func newDiscoverSubscribeResponse(subscriptionType string, subscriptionDetails string) *pb.Message_DiscoverSubscribeResponse {
r := new(pb.Message_DiscoverSubscribeResponse)
r.Status = pb.Message_OK
r.SubscriptionDetails = subscriptionDetails
r.SubscriptionType = subscriptionType
return r
}
func newDiscoverSubscribeResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverSubscribeResponse {
r := new(pb.Message_DiscoverSubscribeResponse)
r.Status = status
r.StatusText = text
return r
}
func newDiscoverSubscribeMessage(ns string, supportedSubscriptionTypes []string) *pb.Message_DiscoverSubscribe {
r := new(pb.Message_DiscoverSubscribe)
r.Ns = ns
r.SupportedSubscriptionTypes = supportedSubscriptionTypes
return r return r
} }

50
svc.go
View File

@ -3,14 +3,13 @@ package rendezvous
import ( import (
"fmt" "fmt"
db "github.com/libp2p/go-libp2p-rendezvous/db"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
db "github.com/libp2p/go-libp2p-rendezvous/db"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
) )
const ( const (
@ -26,11 +25,6 @@ type RendezvousService struct {
rzs []RendezvousSync rzs []RendezvousSync
} }
type RendezvousSync interface {
Register(p peer.ID, ns string, addrs [][]byte, ttl int, counter uint64)
Unregister(p peer.ID, ns string)
}
func NewRendezvousService(host host.Host, db db.DB, rzs ...RendezvousSync) *RendezvousService { func NewRendezvousService(host host.Host, db db.DB, rzs ...RendezvousSync) *RendezvousService {
rz := &RendezvousService{DB: db, rzs: rzs} rz := &RendezvousService{DB: db, rzs: rzs}
host.SetStreamHandler(RendezvousProto, rz.handleStream) host.SetStreamHandler(RendezvousProto, rz.handleStream)
@ -59,7 +53,7 @@ func (rz *RendezvousService) handleStream(s inet.Stream) {
switch t { switch t {
case pb.Message_REGISTER: case pb.Message_REGISTER:
r := rz.handleRegister(pid, req.GetRegister()) r := rz.handleRegister(pid, req.GetRegister())
res.Type = pb.Message_REGISTER_RESPONSE.Enum() res.Type = pb.Message_REGISTER_RESPONSE
res.RegisterResponse = r res.RegisterResponse = r
err = w.WriteMsg(&res) err = w.WriteMsg(&res)
if err != nil { if err != nil {
@ -75,7 +69,7 @@ func (rz *RendezvousService) handleStream(s inet.Stream) {
case pb.Message_DISCOVER: case pb.Message_DISCOVER:
r := rz.handleDiscover(pid, req.GetDiscover()) r := rz.handleDiscover(pid, req.GetDiscover())
res.Type = pb.Message_DISCOVER_RESPONSE.Enum() res.Type = pb.Message_DISCOVER_RESPONSE
res.DiscoverResponse = r res.DiscoverResponse = r
err = w.WriteMsg(&res) err = w.WriteMsg(&res)
if err != nil { if err != nil {
@ -83,6 +77,16 @@ func (rz *RendezvousService) handleStream(s inet.Stream) {
return return
} }
case pb.Message_DISCOVER_SUBSCRIBE:
r := rz.handleDiscoverSubscribe(pid, req.GetDiscoverSubscribe())
res.Type = pb.Message_DISCOVER_SUBSCRIBE_RESPONSE
res.DiscoverSubscribeResponse = r
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
return
}
default: default:
log.Debugf("Unexpected message: %s", t.String()) log.Debugf("Unexpected message: %s", t.String())
return return
@ -231,3 +235,27 @@ func (rz *RendezvousService) handleDiscover(p peer.ID, m *pb.Message_Discover) *
return newDiscoverResponse(regs, cookie) return newDiscoverResponse(regs, cookie)
} }
func (rz *RendezvousService) handleDiscoverSubscribe(_ peer.ID, m *pb.Message_DiscoverSubscribe) *pb.Message_DiscoverSubscribeResponse {
ns := m.GetNs()
for _, s := range rz.rzs {
rzSub, ok := s.(RendezvousSyncSubscribable)
if !ok {
continue
}
for _, supportedSubType := range m.GetSupportedSubscriptionTypes() {
if rzSub.GetServiceType() == supportedSubType {
sub, err := rzSub.Subscribe(ns)
if err != nil {
return newDiscoverSubscribeResponseError(pb.Message_E_INTERNAL_ERROR, "error while subscribing")
}
return newDiscoverSubscribeResponse(supportedSubType, sub)
}
}
}
return newDiscoverSubscribeResponseError(pb.Message_E_INTERNAL_ERROR, "subscription type not found")
}

View File

@ -7,52 +7,18 @@ import (
"testing" "testing"
"time" "time"
db "github.com/libp2p/go-libp2p-rendezvous/db/sqlite"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
testutil "github.com/libp2p/go-libp2p-swarm/testing"
db "github.com/libp2p/go-libp2p-rendezvous/db/sqlite"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
"github.com/libp2p/go-libp2p-rendezvous/test_utils"
) )
func getRendezvousHosts(t *testing.T, ctx context.Context, n int) []host.Host { func getRendezvousHosts(t *testing.T, ctx context.Context, n int) []host.Host {
hosts := getNetHosts(t, ctx, n) return test_utils.GetRendezvousHosts(t, ctx, n)
for i := 1; i < len(hosts); i++ {
connect(t, hosts[0], hosts[i])
}
return hosts
}
func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := testutil.GenSwarm(t)
h := bhost.NewBlankHost(netw)
out = append(out, h)
}
return out
}
func connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
func getRendezvousPoints(t *testing.T, hosts []host.Host) []RendezvousPoint {
clients := make([]RendezvousPoint, len(hosts)-1)
for i, host := range hosts[1:] {
clients[i] = NewRendezvousPoint(host, hosts[0].ID())
}
return clients
} }
func makeRendezvousService(ctx context.Context, host host.Host, path string) (*RendezvousService, error) { func makeRendezvousService(ctx context.Context, host host.Host, path string) (*RendezvousService, error) {
@ -64,6 +30,14 @@ func makeRendezvousService(ctx context.Context, host host.Host, path string) (*R
return NewRendezvousService(host, dbi), nil return NewRendezvousService(host, dbi), nil
} }
func getRendezvousPointsTest(t *testing.T, hosts []host.Host) []RendezvousPoint {
clients := make([]RendezvousPoint, len(hosts)-1)
for i, host := range hosts[1:] {
clients[i] = NewRendezvousPoint(host, hosts[0].ID())
}
return clients
}
func TestSVCRegistrationAndDiscovery(t *testing.T) { func TestSVCRegistrationAndDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -76,7 +50,7 @@ func TestSVCRegistrationAndDiscovery(t *testing.T) {
} }
defer svc.DB.Close() defer svc.DB.Close()
clients := getRendezvousPoints(t, hosts) clients := getRendezvousPointsTest(t, hosts)
const registerTTL = 60 const registerTTL = 60
recordTTL, err := clients[0].Register(ctx, "foo1", registerTTL) recordTTL, err := clients[0].Register(ctx, "foo1", registerTTL)
@ -149,6 +123,10 @@ func TestSVCRegistrationAndDiscovery(t *testing.T) {
} }
err = clients[1].Unregister(ctx, "") err = clients[1].Unregister(ctx, "")
if err != nil {
t.Fatal(err)
}
for _, client := range clients[0:] { for _, client := range clients[0:] {
rrs, _, err = client.Discover(ctx, "foo1", 10, nil) rrs, _, err = client.Discover(ctx, "foo1", 10, nil)
if err != nil { if err != nil {

21
sync_iface.go Normal file
View File

@ -0,0 +1,21 @@
package rendezvous
import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
)
type RendezvousSync interface {
Register(p peer.ID, ns string, addrs [][]byte, ttl int, counter uint64)
Unregister(p peer.ID, ns string)
}
type RendezvousSyncSubscribable interface {
Subscribe(ns string) (syncDetails string, err error)
GetServiceType() string
}
type RendezvousSyncClient interface {
Subscribe(ctx context.Context, syncDetails string) (<-chan *Registration, error)
GetServiceType() string
}

156
sync_inmem_client.go Normal file
View File

@ -0,0 +1,156 @@
package rendezvous
import (
"context"
"encoding/json"
"fmt"
"sync"
ggio "github.com/gogo/protobuf/io"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
)
type client struct {
ctx context.Context
host host.Host
mu sync.Mutex
streams map[string]inet.Stream
subscriptions map[string]map[string]chan *Registration
}
func NewSyncInMemClient(ctx context.Context, h host.Host) *client {
return &client{
ctx: ctx,
host: h,
streams: map[string]inet.Stream{},
subscriptions: map[string]map[string]chan *Registration{},
}
}
func (c *client) getStreamToPeer(pidStr string) (inet.Stream, error) {
c.mu.Lock()
defer c.mu.Unlock()
if stream, ok := c.streams[pidStr]; ok {
return stream, nil
}
pid, err := peer.Decode(pidStr)
if err != nil {
return nil, fmt.Errorf("unable to decode peer id: %w", err)
}
stream, err := c.host.NewStream(c.ctx, pid, ServiceProto)
if err != nil {
return nil, fmt.Errorf("unable to connect to peer: %w", err)
}
go c.streamListener(stream)
return stream, nil
}
func (c *client) streamListener(s inet.Stream) {
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
record := &pb.RegistrationRecord{}
for {
err := r.ReadMsg(record)
if err != nil {
log.Errorf("unable to decode message: %s", err.Error())
return
}
pid, err := peer.Decode(record.Id)
if err != nil {
log.Warnf("invalid peer id: %s", err.Error())
continue
}
maddrs := make([]multiaddr.Multiaddr, len(record.Addrs))
for i, addrBytes := range record.Addrs {
maddrs[i], err = multiaddr.NewMultiaddrBytes(addrBytes)
if err != nil {
log.Warnf("invalid multiaddr: %s", err.Error())
continue
}
}
c.mu.Lock()
subscriptions, ok := c.subscriptions[record.Ns]
if ok {
for _, subscription := range subscriptions {
subscription <- &Registration{
Peer: peer.AddrInfo{
ID: pid,
Addrs: maddrs,
},
Ns: record.Ns,
Ttl: int(record.Ttl),
}
}
}
c.mu.Unlock()
}
}
func (c *client) Subscribe(ctx context.Context, syncDetails string) (<-chan *Registration, error) {
ctxUUID, err := uuid.NewRandom()
if err != nil {
return nil, fmt.Errorf("unable to generate uuid: %w", err)
}
psDetails := &PubSubSubscriptionDetails{}
err = json.Unmarshal([]byte(syncDetails), psDetails)
if err != nil {
return nil, fmt.Errorf("unable to decode json: %w", err)
}
s, err := c.getStreamToPeer(psDetails.PeerID)
if err != nil {
return nil, fmt.Errorf("unable to get stream to peer: %w", err)
}
w := ggio.NewDelimitedWriter(s)
err = w.WriteMsg(&pb.Message{
Type: pb.Message_DISCOVER_SUBSCRIBE,
DiscoverSubscribe: &pb.Message_DiscoverSubscribe{
Ns: psDetails.ChannelName,
}})
if err != nil {
return nil, fmt.Errorf("unable to query server")
}
ch := make(chan *Registration)
c.mu.Lock()
if _, ok := c.subscriptions[psDetails.ChannelName]; !ok {
c.subscriptions[psDetails.ChannelName] = map[string]chan *Registration{}
}
c.subscriptions[psDetails.ChannelName][ctxUUID.String()] = ch
c.mu.Unlock()
go func() {
<-ctx.Done()
c.mu.Lock()
delete(c.subscriptions[psDetails.ChannelName], ctxUUID.String())
c.mu.Unlock()
close(ch)
}()
return ch, nil
}
func (c *client) GetServiceType() string {
return ServiceType
}
var _ RendezvousSyncClient = (*client)(nil)

158
sync_inmem_provider.go Normal file
View File

@ -0,0 +1,158 @@
package rendezvous
import (
"encoding/json"
"fmt"
"sync"
"time"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pb "github.com/libp2p/go-libp2p-rendezvous/pb"
)
const (
ServiceType = "inmem"
ServiceProto = protocol.ID("/rendezvous/sync/inmem/1.0.0")
)
type PubSub struct {
mu sync.RWMutex
host host.Host
topics map[string]*PubSubSubscribers
}
type PubSubSubscribers struct {
mu sync.RWMutex
subscribers map[peer.ID]ggio.Writer
lastAnnouncement *pb.RegistrationRecord
}
type PubSubSubscriptionDetails struct {
PeerID string
ChannelName string
}
func NewSyncInMemProvider(host host.Host) (*PubSub, error) {
ps := &PubSub{
host: host,
topics: map[string]*PubSubSubscribers{},
}
ps.Listen()
return ps, nil
}
func (ps *PubSub) Subscribe(ns string) (syncDetails string, err error) {
details, err := json.Marshal(&PubSubSubscriptionDetails{
PeerID: ps.host.ID().String(),
ChannelName: ns,
})
if err != nil {
return "", fmt.Errorf("unable to marshal subscription details: %w", err)
}
return string(details), nil
}
func (ps *PubSub) GetServiceType() string {
return ServiceType
}
func (ps *PubSub) getOrCreateTopic(ns string) *PubSubSubscribers {
ps.mu.Lock()
defer ps.mu.Unlock()
if subscribers, ok := ps.topics[ns]; ok {
return subscribers
}
ps.topics[ns] = &PubSubSubscribers{
subscribers: map[peer.ID]ggio.Writer{},
lastAnnouncement: nil,
}
return ps.topics[ns]
}
func (ps *PubSub) Register(pid peer.ID, ns string, addrs [][]byte, ttlAsSeconds int, counter uint64) {
subscribers := ps.getOrCreateTopic(ns)
dataToSend := &pb.RegistrationRecord{
Id: pid.String(),
Addrs: addrs,
Ns: ns,
Ttl: time.Now().Add(time.Duration(ttlAsSeconds) * time.Second).UnixMilli(),
}
subscribers.mu.Lock()
subscribers.lastAnnouncement = dataToSend
toNotify := subscribers.subscribers
subscribers.mu.Unlock()
for _, stream := range toNotify {
if err := stream.WriteMsg(dataToSend); err != nil {
log.Errorf("unable to notify rendezvous data update: %s", err.Error())
}
}
}
func (ps *PubSub) Unregister(p peer.ID, ns string) {
// TODO: unsupported
}
func (ps *PubSub) Listen() {
ps.host.SetStreamHandler(ServiceProto, ps.handleStream)
}
func (ps *PubSub) handleStream(s inet.Stream) {
defer s.Reset()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
subscribedTopics := map[string]struct{}{}
for {
var req pb.Message
err := r.ReadMsg(&req)
if err != nil {
for ns := range subscribedTopics {
topic := ps.getOrCreateTopic(ns)
topic.mu.Lock()
delete(topic.subscribers, s.Conn().RemotePeer())
topic.mu.Unlock()
}
return
}
if req.Type != pb.Message_DISCOVER_SUBSCRIBE {
continue
}
topic := ps.getOrCreateTopic(req.DiscoverSubscribe.Ns)
topic.mu.Lock()
if _, ok := topic.subscribers[s.Conn().RemotePeer()]; ok {
topic.mu.Unlock()
continue
}
topic.subscribers[s.Conn().RemotePeer()] = w
subscribedTopics[req.DiscoverSubscribe.Ns] = struct{}{}
lastAnnouncement := topic.lastAnnouncement
topic.mu.Unlock()
if lastAnnouncement != nil {
if err := w.WriteMsg(lastAnnouncement); err != nil {
log.Errorf("unable to write announcement: %s", err.Error())
}
}
}
}
var _ RendezvousSync = (*PubSub)(nil)
var _ RendezvousSyncSubscribable = (*PubSub)(nil)

103
sync_inmem_test.go Normal file
View File

@ -0,0 +1,103 @@
package rendezvous_test
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/host"
rendezvous "github.com/libp2p/go-libp2p-rendezvous"
db "github.com/libp2p/go-libp2p-rendezvous/db/sqlite"
"github.com/libp2p/go-libp2p-rendezvous/test_utils"
)
func makeRendezvousService(ctx context.Context, host host.Host, path string, rzs ...rendezvous.RendezvousSync) (*rendezvous.RendezvousService, error) {
dbi, err := db.OpenDB(ctx, path)
if err != nil {
return nil, err
}
return rendezvous.NewRendezvousService(host, dbi, rzs...), nil
}
func getRendezvousClients(ctx context.Context, t *testing.T, hosts []host.Host) []rendezvous.RendezvousClient {
t.Helper()
clients := make([]rendezvous.RendezvousClient, len(hosts)-1)
for i, host := range hosts[1:] {
syncClient := rendezvous.NewSyncInMemClient(ctx, host)
clients[i] = rendezvous.NewRendezvousClient(host, hosts[0].ID(), syncClient)
}
return clients
}
func TestFlow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Instantiate server and clients
hosts := test_utils.GetRendezvousHosts(t, ctx, 4)
inmemPubSubSync, err := rendezvous.NewSyncInMemProvider(hosts[0])
if err != nil {
t.Fatal(err)
}
svc, err := makeRendezvousService(ctx, hosts[0], ":memory:", inmemPubSubSync)
if err != nil {
t.Fatal(err)
}
defer svc.DB.Close()
clients := getRendezvousClients(ctx, t, hosts)
regFound := int64(0)
wg := sync.WaitGroup{}
const announcementCount = 5
for _, client := range clients[1:] {
wg.Add(1)
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
ch, err := client.DiscoverSubscribe(ctx, "foo1")
if err != nil {
t.Fatal(err)
}
go func() {
regFoundForPeer := 0
defer cancel()
defer wg.Done()
for p := range ch {
if test_utils.CheckPeerInfo(t, p, hosts[2], false) == true {
regFoundForPeer++
atomic.AddInt64(&regFound, 1)
}
if regFoundForPeer == announcementCount {
go func() {
// this allows more events to be received
time.Sleep(time.Millisecond * 500)
cancel()
}()
}
}
}()
}
for i := 0; i < announcementCount; i++ {
_, err = clients[1].Register(ctx, "foo1", rendezvous.DefaultTTL)
if err != nil {
t.Fatal(err)
}
}
wg.Wait()
if regFound != int64(len(clients[1:]))*announcementCount {
t.Fatalf("expected %d records to be found got %d", int64(len(clients[1:])), regFound)
}
}

67
test_utils/svc.go Normal file
View File

@ -0,0 +1,67 @@
package test_utils
import (
"context"
"testing"
bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
testutil "github.com/libp2p/go-libp2p-swarm/testing"
)
func GetRendezvousHosts(t *testing.T, ctx context.Context, n int) []host.Host {
hosts := GetNetHosts(t, ctx, n)
for i := 1; i < len(hosts); i++ {
Connect(t, hosts[0], hosts[i])
}
return hosts
}
func GetNetHosts(t *testing.T, ctx context.Context, n int) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := testutil.GenSwarm(t)
h := bhost.NewBlankHost(netw)
out = append(out, h)
}
return out
}
func Connect(t *testing.T, a, b host.Host) {
pinfo := a.Peerstore().PeerInfo(a.ID())
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
func CheckPeerInfo(t *testing.T, pi peer.AddrInfo, host host.Host, fatal bool) bool {
if pi.ID != host.ID() {
if fatal {
t.Fatal("bad registration: peer ID doesn't match host ID")
}
return false
}
addrs := host.Addrs()
raddrs := pi.Addrs
if len(addrs) != len(raddrs) {
if fatal {
t.Fatal("bad registration: peer address length mismatch")
}
return false
}
for i, addr := range addrs {
raddr := raddrs[i]
if !addr.Equal(raddr) {
if fatal {
t.Fatal("bad registration: peer address mismatch")
}
return false
}
}
return true
}