chore: bump go-waku

This commit is contained in:
Richard Ramos 2023-06-07 16:46:50 -04:00 committed by RichΛrd
parent 56cad423f0
commit e53c2c0a6d
69 changed files with 1356 additions and 4893 deletions

View File

@ -1 +1 @@
0.156.1
0.157.0

6
go.mod
View File

@ -82,7 +82,7 @@ require (
github.com/mutecomm/go-sqlcipher/v4 v4.4.2
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/waku-org/go-waku v0.6.1-0.20230526151800-10c2e20910bf
github.com/waku-org/go-waku v0.6.1-0.20230605200314-b0c094b0b663
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
go.uber.org/multierr v1.11.0
@ -116,7 +116,6 @@ require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/benbjohnson/immutable v0.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/berty/go-libp2p-rendezvous v0.4.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect
github.com/btcsuite/btcd v0.22.1 // indirect
@ -233,7 +232,7 @@ require (
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/rjeczalik/notify v0.9.3 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686 // indirect
github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect
@ -249,6 +248,7 @@ require (
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
github.com/urfave/cli/v2 v2.24.4 // indirect
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671 // indirect
github.com/waku-org/go-zerokit-rln v0.1.12 // indirect
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 // indirect
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230331223149-f90e66aebb0d // indirect

11
go.sum
View File

@ -398,8 +398,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/berty/go-libp2p-rendezvous v0.4.1 h1:+yXsKocTxfKt+Sl3JkcPbv1J31QKjYoYSyIpMWGb/Wc=
github.com/berty/go-libp2p-rendezvous v0.4.1/go.mod h1:Kc2dtCckvFN44/eCiWXT5YbwVbR7WA5iPhDZIKNkQG0=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
@ -1869,8 +1867,9 @@ github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho=
github.com/rjeczalik/notify v0.9.2 h1:MiTWrPj55mNDHEiIX5YUSKefw/+lCQVoAFmD6oQm5w8=
github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM=
github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY=
github.com/rjeczalik/notify v0.9.3/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc=
github.com/robertkrimen/godocdown v0.0.0-20130622164427-0bfa04905481/go.mod h1:C9WhFzY47SzYBIvzFqSvHIR6ROgDo4TtdTuRaOMjF/s=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@ -2111,8 +2110,10 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.6.1-0.20230526151800-10c2e20910bf h1:9+3yMLVo/O7QX7s98gZ+nqLwl7UX/XGKBZ9PHVufkkY=
github.com/waku-org/go-waku v0.6.1-0.20230526151800-10c2e20910bf/go.mod h1:dcGbxOa6+J+RxFP43QDJaiVrsXKGbK8v3suUabWbvOs=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671 h1:iOCDabjZ11Zk0ejdWBR54OEFA/rRZdQgIrX6Rv4U7AM=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671/go.mod h1:/1YwD6sx3xsbrSkVa4++e8AUDcUjC035bgKwDsZo+0Q=
github.com/waku-org/go-waku v0.6.1-0.20230605200314-b0c094b0b663 h1:BBRrzCEy0p+TEdspf+gAvl6uc3FR27oHL0t4BgrLaDk=
github.com/waku-org/go-waku v0.6.1-0.20230605200314-b0c094b0b663/go.mod h1:qz1a/J+lTA+hJy61aVgGP9E85SnAQ2gldRgU97aUB/U=
github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU=
github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs=

View File

@ -1,2 +0,0 @@
//go:generate protoc --proto_path=pb/ --gofast_opt="Mrendezvous.proto=.;rendezvous_pb" --gofast_out=./pb ./pb/rendezvous.proto
package rendezvous

File diff suppressed because it is too large Load Diff

View File

@ -1,91 +0,0 @@
syntax = "proto3";
package rendezvous.pb;
message Message {
enum MessageType {
REGISTER = 0;
REGISTER_RESPONSE = 1;
UNREGISTER = 2;
DISCOVER = 3;
DISCOVER_RESPONSE = 4;
DISCOVER_SUBSCRIBE = 100;
DISCOVER_SUBSCRIBE_RESPONSE = 101;
}
enum ResponseStatus {
OK = 0;
E_INVALID_NAMESPACE = 100;
E_INVALID_PEER_INFO = 101;
E_INVALID_TTL = 102;
E_INVALID_COOKIE = 103;
E_NOT_AUTHORIZED = 200;
E_INTERNAL_ERROR = 300;
E_UNAVAILABLE = 400;
}
message PeerInfo {
bytes id = 1;
repeated bytes addrs = 2;
}
message Register {
string ns = 1;
PeerInfo peer = 2;
int64 ttl = 3; // in seconds
}
message RegisterResponse {
ResponseStatus status = 1;
string statusText = 2;
int64 ttl = 3;
}
message Unregister {
string ns = 1;
bytes id = 2;
}
message Discover {
string ns = 1;
int64 limit = 2;
bytes cookie = 3;
}
message DiscoverResponse {
repeated Register registrations = 1;
bytes cookie = 2;
ResponseStatus status = 3;
string statusText = 4;
}
message DiscoverSubscribe {
repeated string supported_subscription_types = 1;
string ns = 2;
}
message DiscoverSubscribeResponse {
string subscription_type = 1;
string subscription_details = 2;
ResponseStatus status = 3;
string statusText = 4;
}
MessageType type = 1;
Register register = 2;
RegisterResponse registerResponse = 3;
Unregister unregister = 4;
Discover discover = 5;
DiscoverResponse discoverResponse = 6;
DiscoverSubscribe discoverSubscribe = 100;
DiscoverSubscribeResponse discoverSubscribeResponse = 101;
}
message RegistrationRecord{
string id = 1;
repeated bytes addrs = 2;
string ns = 3;
int64 ttl = 4;
}

View File

@ -1,178 +0,0 @@
package rendezvous
import (
"errors"
"fmt"
db "github.com/berty/go-libp2p-rendezvous/db"
pb "github.com/berty/go-libp2p-rendezvous/pb"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("rendezvous")
const (
RendezvousProto = protocol.ID("/rendezvous/1.0.0")
DefaultTTL = 2 * 3600 // 2hr
)
type RendezvousError struct {
Status pb.Message_ResponseStatus
Text string
}
func (e RendezvousError) Error() string {
return fmt.Sprintf("Rendezvous error: %s (%s)", e.Text, e.Status.String())
}
func NewRegisterMessage(ns string, pi peer.AddrInfo, ttl int) *pb.Message {
return newRegisterMessage(ns, pi, ttl)
}
func newRegisterMessage(ns string, pi peer.AddrInfo, ttl int) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_REGISTER
msg.Register = new(pb.Message_Register)
if ns != "" {
msg.Register.Ns = ns
}
if ttl > 0 {
ttl64 := int64(ttl)
msg.Register.Ttl = ttl64
}
msg.Register.Peer = new(pb.Message_PeerInfo)
msg.Register.Peer.Id = []byte(pi.ID)
msg.Register.Peer.Addrs = make([][]byte, len(pi.Addrs))
for i, addr := range pi.Addrs {
msg.Register.Peer.Addrs[i] = addr.Bytes()
}
return msg
}
func newUnregisterMessage(ns string, pid peer.ID) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_UNREGISTER
msg.Unregister = new(pb.Message_Unregister)
if ns != "" {
msg.Unregister.Ns = ns
}
msg.Unregister.Id = []byte(pid)
return msg
}
func NewDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
return newDiscoverMessage(ns, limit, cookie)
}
func newDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_DISCOVER
msg.Discover = new(pb.Message_Discover)
if ns != "" {
msg.Discover.Ns = ns
}
if limit > 0 {
limit64 := int64(limit)
msg.Discover.Limit = limit64
}
if cookie != nil {
msg.Discover.Cookie = cookie
}
return msg
}
func pbToPeerInfo(p *pb.Message_PeerInfo) (peer.AddrInfo, error) {
if p == nil {
return peer.AddrInfo{}, errors.New("missing peer info")
}
id, err := peer.IDFromBytes(p.Id)
if err != nil {
return peer.AddrInfo{}, err
}
addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
for _, bs := range p.Addrs {
addr, err := ma.NewMultiaddrBytes(bs)
if err != nil {
log.Errorf("Error parsing multiaddr: %s", err.Error())
continue
}
addrs = append(addrs, addr)
}
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}
func newRegisterResponse(ttl int) *pb.Message_RegisterResponse {
ttl64 := int64(ttl)
r := new(pb.Message_RegisterResponse)
r.Status = pb.Message_OK
r.Ttl = ttl64
return r
}
func newRegisterResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_RegisterResponse {
r := new(pb.Message_RegisterResponse)
r.Status = status
r.StatusText = text
return r
}
func newDiscoverResponse(regs []db.RegistrationRecord, cookie []byte) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = pb.Message_OK
rregs := make([]*pb.Message_Register, len(regs))
for i, reg := range regs {
rreg := new(pb.Message_Register)
rns := reg.Ns
rreg.Ns = rns
rreg.Peer = new(pb.Message_PeerInfo)
rreg.Peer.Id = []byte(reg.Id)
rreg.Peer.Addrs = reg.Addrs
rttl := int64(reg.Ttl)
rreg.Ttl = rttl
rregs[i] = rreg
}
r.Registrations = rregs
r.Cookie = cookie
return r
}
func newDiscoverResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = status
r.StatusText = text
return r
}
func newDiscoverSubscribeResponse(subscriptionType string, subscriptionDetails string) *pb.Message_DiscoverSubscribeResponse {
r := new(pb.Message_DiscoverSubscribeResponse)
r.Status = pb.Message_OK
r.SubscriptionDetails = subscriptionDetails
r.SubscriptionType = subscriptionType
return r
}
func newDiscoverSubscribeResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverSubscribeResponse {
r := new(pb.Message_DiscoverSubscribeResponse)
r.Status = status
r.StatusText = text
return r
}
func newDiscoverSubscribeMessage(ns string, supportedSubscriptionTypes []string) *pb.Message_DiscoverSubscribe {
r := new(pb.Message_DiscoverSubscribe)
r.Ns = ns
r.SupportedSubscriptionTypes = supportedSubscriptionTypes
return r
}

View File

@ -1,22 +0,0 @@
package rendezvous
import (
"context"
"github.com/libp2p/go-libp2p/core/peer"
)
type RendezvousSync interface {
Register(p peer.ID, ns string, addrs [][]byte, ttl int, counter uint64)
Unregister(p peer.ID, ns string)
}
type RendezvousSyncSubscribable interface {
Subscribe(ns string) (syncDetails string, err error)
GetServiceType() string
}
type RendezvousSyncClient interface {
Subscribe(ctx context.Context, syncDetails string) (<-chan *Registration, error)
GetServiceType() string
}

View File

@ -1,156 +0,0 @@
package rendezvous
import (
"context"
"encoding/json"
"fmt"
"sync"
ggio "github.com/gogo/protobuf/io"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
pb "github.com/berty/go-libp2p-rendezvous/pb"
)
type client struct {
ctx context.Context
host host.Host
mu sync.Mutex
streams map[string]inet.Stream
subscriptions map[string]map[string]chan *Registration
}
func NewSyncInMemClient(ctx context.Context, h host.Host) *client {
return &client{
ctx: ctx,
host: h,
streams: map[string]inet.Stream{},
subscriptions: map[string]map[string]chan *Registration{},
}
}
func (c *client) getStreamToPeer(pidStr string) (inet.Stream, error) {
c.mu.Lock()
defer c.mu.Unlock()
if stream, ok := c.streams[pidStr]; ok {
return stream, nil
}
pid, err := peer.Decode(pidStr)
if err != nil {
return nil, fmt.Errorf("unable to decode peer id: %w", err)
}
stream, err := c.host.NewStream(c.ctx, pid, ServiceProto)
if err != nil {
return nil, fmt.Errorf("unable to connect to peer: %w", err)
}
go c.streamListener(stream)
return stream, nil
}
func (c *client) streamListener(s inet.Stream) {
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
record := &pb.RegistrationRecord{}
for {
err := r.ReadMsg(record)
if err != nil {
log.Errorf("unable to decode message: %s", err.Error())
return
}
pid, err := peer.Decode(record.Id)
if err != nil {
log.Warnf("invalid peer id: %s", err.Error())
continue
}
maddrs := make([]multiaddr.Multiaddr, len(record.Addrs))
for i, addrBytes := range record.Addrs {
maddrs[i], err = multiaddr.NewMultiaddrBytes(addrBytes)
if err != nil {
log.Warnf("invalid multiaddr: %s", err.Error())
continue
}
}
c.mu.Lock()
subscriptions, ok := c.subscriptions[record.Ns]
if ok {
for _, subscription := range subscriptions {
subscription <- &Registration{
Peer: peer.AddrInfo{
ID: pid,
Addrs: maddrs,
},
Ns: record.Ns,
Ttl: int(record.Ttl),
}
}
}
c.mu.Unlock()
}
}
func (c *client) Subscribe(ctx context.Context, syncDetails string) (<-chan *Registration, error) {
ctxUUID, err := uuid.NewRandom()
if err != nil {
return nil, fmt.Errorf("unable to generate uuid: %w", err)
}
psDetails := &PubSubSubscriptionDetails{}
err = json.Unmarshal([]byte(syncDetails), psDetails)
if err != nil {
return nil, fmt.Errorf("unable to decode json: %w", err)
}
s, err := c.getStreamToPeer(psDetails.PeerID)
if err != nil {
return nil, fmt.Errorf("unable to get stream to peer: %w", err)
}
w := ggio.NewDelimitedWriter(s)
err = w.WriteMsg(&pb.Message{
Type: pb.Message_DISCOVER_SUBSCRIBE,
DiscoverSubscribe: &pb.Message_DiscoverSubscribe{
Ns: psDetails.ChannelName,
}})
if err != nil {
return nil, fmt.Errorf("unable to query server")
}
ch := make(chan *Registration)
c.mu.Lock()
if _, ok := c.subscriptions[psDetails.ChannelName]; !ok {
c.subscriptions[psDetails.ChannelName] = map[string]chan *Registration{}
}
c.subscriptions[psDetails.ChannelName][ctxUUID.String()] = ch
c.mu.Unlock()
go func() {
<-ctx.Done()
c.mu.Lock()
delete(c.subscriptions[psDetails.ChannelName], ctxUUID.String())
c.mu.Unlock()
close(ch)
}()
return ch, nil
}
func (c *client) GetServiceType() string {
return ServiceType
}
var _ RendezvousSyncClient = (*client)(nil)

View File

@ -1,156 +0,0 @@
package rendezvous
import (
"encoding/json"
"fmt"
"sync"
"time"
pb "github.com/berty/go-libp2p-rendezvous/pb"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
ServiceType = "inmem"
ServiceProto = protocol.ID("/rendezvous/sync/inmem/1.0.0")
)
type PubSub struct {
mu sync.RWMutex
host host.Host
topics map[string]*PubSubSubscribers
}
type PubSubSubscribers struct {
mu sync.RWMutex
subscribers map[peer.ID]ggio.Writer
lastAnnouncement *pb.RegistrationRecord
}
type PubSubSubscriptionDetails struct {
PeerID string
ChannelName string
}
func NewSyncInMemProvider(host host.Host) (*PubSub, error) {
ps := &PubSub{
host: host,
topics: map[string]*PubSubSubscribers{},
}
ps.Listen()
return ps, nil
}
func (ps *PubSub) Subscribe(ns string) (syncDetails string, err error) {
details, err := json.Marshal(&PubSubSubscriptionDetails{
PeerID: ps.host.ID().String(),
ChannelName: ns,
})
if err != nil {
return "", fmt.Errorf("unable to marshal subscription details: %w", err)
}
return string(details), nil
}
func (ps *PubSub) GetServiceType() string {
return ServiceType
}
func (ps *PubSub) getOrCreateTopic(ns string) *PubSubSubscribers {
ps.mu.Lock()
defer ps.mu.Unlock()
if subscribers, ok := ps.topics[ns]; ok {
return subscribers
}
ps.topics[ns] = &PubSubSubscribers{
subscribers: map[peer.ID]ggio.Writer{},
lastAnnouncement: nil,
}
return ps.topics[ns]
}
func (ps *PubSub) Register(pid peer.ID, ns string, addrs [][]byte, ttlAsSeconds int, counter uint64) {
topic := ps.getOrCreateTopic(ns)
dataToSend := &pb.RegistrationRecord{
Id: pid.String(),
Addrs: addrs,
Ns: ns,
Ttl: time.Now().Add(time.Duration(ttlAsSeconds) * time.Second).UnixMilli(),
}
topic.mu.Lock()
topic.lastAnnouncement = dataToSend
toNotify := topic.subscribers
for _, stream := range toNotify {
if err := stream.WriteMsg(dataToSend); err != nil {
log.Errorf("unable to notify rendezvous data update: %s", err.Error())
}
}
topic.mu.Unlock()
}
func (ps *PubSub) Unregister(p peer.ID, ns string) {
// TODO: unsupported
}
func (ps *PubSub) Listen() {
ps.host.SetStreamHandler(ServiceProto, ps.handleStream)
}
func (ps *PubSub) handleStream(s inet.Stream) {
defer s.Reset()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
subscribedTopics := map[string]struct{}{}
for {
var req pb.Message
err := r.ReadMsg(&req)
if err != nil {
for ns := range subscribedTopics {
topic := ps.getOrCreateTopic(ns)
topic.mu.Lock()
delete(topic.subscribers, s.Conn().RemotePeer())
topic.mu.Unlock()
}
return
}
if req.Type != pb.Message_DISCOVER_SUBSCRIBE {
continue
}
topic := ps.getOrCreateTopic(req.DiscoverSubscribe.Ns)
topic.mu.Lock()
if _, ok := topic.subscribers[s.Conn().RemotePeer()]; ok {
topic.mu.Unlock()
continue
}
topic.subscribers[s.Conn().RemotePeer()] = w
subscribedTopics[req.DiscoverSubscribe.Ns] = struct{}{}
lastAnnouncement := topic.lastAnnouncement
if lastAnnouncement != nil {
if err := w.WriteMsg(lastAnnouncement); err != nil {
log.Errorf("unable to write announcement: %s", err.Error())
}
}
topic.mu.Unlock()
}
}
var _ RendezvousSync = (*PubSub)(nil)
var _ RendezvousSyncSubscribable = (*PubSub)(nil)

View File

@ -1,102 +0,0 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"github.com/gogo/protobuf/proto"
"io"
)
func NewFullWriter(w io.Writer) WriteCloser {
return &fullWriter{w, nil}
}
type fullWriter struct {
w io.Writer
buffer []byte
}
func (this *fullWriter) WriteMsg(msg proto.Message) (err error) {
var data []byte
if m, ok := msg.(marshaler); ok {
n, ok := getSize(m)
if !ok {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
if n >= len(this.buffer) {
this.buffer = make([]byte, n)
}
_, err = m.MarshalTo(this.buffer)
if err != nil {
return err
}
data = this.buffer[:n]
} else {
data, err = proto.Marshal(msg)
if err != nil {
return err
}
}
_, err = this.w.Write(data)
return err
}
func (this *fullWriter) Close() error {
if closer, ok := this.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
type fullReader struct {
r io.Reader
buf []byte
}
func NewFullReader(r io.Reader, maxSize int) ReadCloser {
return &fullReader{r, make([]byte, maxSize)}
}
func (this *fullReader) ReadMsg(msg proto.Message) error {
length, err := this.r.Read(this.buf)
if err != nil {
return err
}
return proto.Unmarshal(this.buf[:length], msg)
}
func (this *fullReader) Close() error {
if closer, ok := this.r.(io.Closer); ok {
return closer.Close()
}
return nil
}

View File

@ -1,70 +0,0 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"github.com/gogo/protobuf/proto"
"io"
)
type Writer interface {
WriteMsg(proto.Message) error
}
type WriteCloser interface {
Writer
io.Closer
}
type Reader interface {
ReadMsg(msg proto.Message) error
}
type ReadCloser interface {
Reader
io.Closer
}
type marshaler interface {
MarshalTo(data []byte) (n int, err error)
}
func getSize(v interface{}) (int, bool) {
if sz, ok := v.(interface {
Size() (n int)
}); ok {
return sz.Size(), true
} else if sz, ok := v.(interface {
ProtoSize() (n int)
}); ok {
return sz.ProtoSize(), true
} else {
return 0, false
}
}

View File

@ -1,138 +0,0 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"encoding/binary"
"io"
"github.com/gogo/protobuf/proto"
)
const uint32BinaryLen = 4
func NewUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder) WriteCloser {
return &uint32Writer{w, byteOrder, nil, make([]byte, uint32BinaryLen)}
}
func NewSizeUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder, size int) WriteCloser {
return &uint32Writer{w, byteOrder, make([]byte, size), make([]byte, uint32BinaryLen)}
}
type uint32Writer struct {
w io.Writer
byteOrder binary.ByteOrder
buffer []byte
lenBuf []byte
}
func (this *uint32Writer) writeFallback(msg proto.Message) error {
data, err := proto.Marshal(msg)
if err != nil {
return err
}
length := uint32(len(data))
this.byteOrder.PutUint32(this.lenBuf, length)
if _, err = this.w.Write(this.lenBuf); err != nil {
return err
}
_, err = this.w.Write(data)
return err
}
func (this *uint32Writer) WriteMsg(msg proto.Message) error {
m, ok := msg.(marshaler)
if !ok {
return this.writeFallback(msg)
}
n, ok := getSize(m)
if !ok {
return this.writeFallback(msg)
}
size := n + uint32BinaryLen
if size > len(this.buffer) {
this.buffer = make([]byte, size)
}
this.byteOrder.PutUint32(this.buffer, uint32(n))
if _, err := m.MarshalTo(this.buffer[uint32BinaryLen:]); err != nil {
return err
}
_, err := this.w.Write(this.buffer[:size])
return err
}
func (this *uint32Writer) Close() error {
if closer, ok := this.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
type uint32Reader struct {
r io.Reader
byteOrder binary.ByteOrder
lenBuf []byte
buf []byte
maxSize int
}
func NewUint32DelimitedReader(r io.Reader, byteOrder binary.ByteOrder, maxSize int) ReadCloser {
return &uint32Reader{r, byteOrder, make([]byte, 4), nil, maxSize}
}
func (this *uint32Reader) ReadMsg(msg proto.Message) error {
if _, err := io.ReadFull(this.r, this.lenBuf); err != nil {
return err
}
length32 := this.byteOrder.Uint32(this.lenBuf)
length := int(length32)
if length < 0 || length > this.maxSize {
return io.ErrShortBuffer
}
if length > len(this.buf) {
this.buf = make([]byte, length)
}
_, err := io.ReadFull(this.r, this.buf[:length])
if err != nil {
return err
}
return proto.Unmarshal(this.buf[:length], msg)
}
func (this *uint32Reader) Close() error {
if closer, ok := this.r.(io.Closer); ok {
return closer.Close()
}
return nil
}

View File

@ -1,133 +0,0 @@
// Protocol Buffers for Go with Gadgets
//
// Copyright (c) 2013, The GoGo Authors. All rights reserved.
// http://github.com/gogo/protobuf
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package io
import (
"bufio"
"encoding/binary"
"errors"
"github.com/gogo/protobuf/proto"
"io"
)
var (
errSmallBuffer = errors.New("Buffer Too Small")
errLargeValue = errors.New("Value is Larger than 64 bits")
)
func NewDelimitedWriter(w io.Writer) WriteCloser {
return &varintWriter{w, make([]byte, binary.MaxVarintLen64), nil}
}
type varintWriter struct {
w io.Writer
lenBuf []byte
buffer []byte
}
func (this *varintWriter) WriteMsg(msg proto.Message) (err error) {
var data []byte
if m, ok := msg.(marshaler); ok {
n, ok := getSize(m)
if ok {
if n+binary.MaxVarintLen64 >= len(this.buffer) {
this.buffer = make([]byte, n+binary.MaxVarintLen64)
}
lenOff := binary.PutUvarint(this.buffer, uint64(n))
_, err = m.MarshalTo(this.buffer[lenOff:])
if err != nil {
return err
}
_, err = this.w.Write(this.buffer[:lenOff+n])
return err
}
}
// fallback
data, err = proto.Marshal(msg)
if err != nil {
return err
}
length := uint64(len(data))
n := binary.PutUvarint(this.lenBuf, length)
_, err = this.w.Write(this.lenBuf[:n])
if err != nil {
return err
}
_, err = this.w.Write(data)
return err
}
func (this *varintWriter) Close() error {
if closer, ok := this.w.(io.Closer); ok {
return closer.Close()
}
return nil
}
func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser {
var closer io.Closer
if c, ok := r.(io.Closer); ok {
closer = c
}
return &varintReader{bufio.NewReader(r), nil, maxSize, closer}
}
type varintReader struct {
r *bufio.Reader
buf []byte
maxSize int
closer io.Closer
}
func (this *varintReader) ReadMsg(msg proto.Message) error {
length64, err := binary.ReadUvarint(this.r)
if err != nil {
return err
}
length := int(length64)
if length < 0 || length > this.maxSize {
return io.ErrShortBuffer
}
if len(this.buf) < length {
this.buf = make([]byte, length)
}
buf := this.buf[:length]
if _, err := io.ReadFull(this.r, buf); err != nil {
return err
}
return proto.Unmarshal(buf, msg)
}
func (this *varintReader) Close() error {
if this.closer != nil {
return this.closer.Close()
}
return nil
}

View File

@ -2,7 +2,8 @@ language: go
go:
- 1.10.x
- tip
- 1.11.x
- master
os:
- linux

View File

@ -18,5 +18,6 @@ Filesystem event notification library on steroids. (under active development)
- [github.com/rjeczalik/cmd/notify](https://godoc.org/github.com/rjeczalik/cmd/notify)
- [github.com/cortesi/devd](https://github.com/cortesi/devd)
- [github.com/cortesi/modd](https://github.com/cortesi/modd)
- [github.com/syncthing/syncthing-inotify](https://github.com/syncthing/syncthing-inotify)
- [github.com/syncthing/syncthing](https://github.com/syncthing/syncthing)
- [github.com/OrlovEvgeny/TinyJPG](https://github.com/OrlovEvgeny/TinyJPG)
- [github.com/mitranim/gow](https://github.com/mitranim/gow)

View File

@ -2,6 +2,7 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build debug
// +build debug
package notify

View File

@ -2,6 +2,7 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build !debug
// +build !debug
package notify

View File

@ -57,7 +57,7 @@ func (e Event) String() string {
//
// The value of Sys if system-dependent and can be nil.
//
// Sys
// # Sys
//
// Under Darwin (FSEvents) Sys() always returns a non-nil *notify.FSEvent value,
// which is defined as:

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build solaris
//go:build solaris || illumos
// +build solaris illumos
package notify

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build darwin,!kqueue
//go:build darwin && !kqueue && cgo
// +build darwin,!kqueue,cgo
package notify

View File

@ -2,6 +2,7 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build linux
// +build linux
package notify

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build darwin,kqueue dragonfly freebsd netbsd openbsd
//go:build (darwin && kqueue) || (darwin && !cgo) || dragonfly || freebsd || netbsd || openbsd
// +build darwin,kqueue darwin,!cgo dragonfly freebsd netbsd openbsd
package notify

View File

@ -2,6 +2,7 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build windows
// +build windows
package notify

View File

@ -2,8 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build !darwin,!linux,!freebsd,!dragonfly,!netbsd,!openbsd,!windows
// +build !kqueue,!solaris
//go:build !darwin && !linux && !freebsd && !dragonfly && !netbsd && !openbsd && !windows && !kqueue && !solaris && !illumos
// +build !darwin,!linux,!freebsd,!dragonfly,!netbsd,!openbsd,!windows,!kqueue,!solaris,!illumos
package notify

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build darwin,kqueue dragonfly freebsd netbsd openbsd solaris
//go:build (darwin && kqueue) || (darwin && !cgo) || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
// +build darwin,kqueue darwin,!cgo dragonfly freebsd netbsd openbsd solaris illumos
package notify

View File

@ -6,7 +6,7 @@ package notify
import (
"errors"
"io/ioutil"
"io/fs"
"os"
"path/filepath"
)
@ -49,7 +49,7 @@ func (nd node) addchild(name, base string) node {
}
func (nd node) Add(name string) node {
i := indexbase(nd.Name, name)
i := indexrel(nd.Name, name)
if i == -1 {
return node{}
}
@ -78,12 +78,12 @@ Traverse:
}
// TODO(rjeczalik): tolerate open failures - add failed names to
// AddDirError and notify users which names are not added to the tree.
fi, err := ioutil.ReadDir(nd.Name)
fi, err := os.ReadDir(nd.Name)
if err != nil {
return err
}
for _, fi := range fi {
if fi.Mode()&(os.ModeSymlink|os.ModeDir) == os.ModeDir {
if fi.Type()&(fs.ModeSymlink|fs.ModeDir) == fs.ModeDir {
name := filepath.Join(nd.Name, fi.Name())
stack = append(stack, nd.addchild(name, name[len(nd.Name)+1:]))
}
@ -93,7 +93,7 @@ Traverse:
}
func (nd node) Get(name string) (node, error) {
i := indexbase(nd.Name, name)
i := indexrel(nd.Name, name)
if i == -1 {
return node{}, errnotexist(name)
}
@ -111,7 +111,7 @@ func (nd node) Get(name string) (node, error) {
}
func (nd node) Del(name string) error {
i := indexbase(nd.Name, name)
i := indexrel(nd.Name, name)
if i == -1 {
return errnotexist(name)
}
@ -122,13 +122,13 @@ func (nd node) Del(name string) error {
return errnotexist(name[:i+j])
}
stack = append(stack, nd)
i += j + 1
}
if nd, ok = nd.Child[name[i:]]; !ok {
if _, ok = nd.Child[name[i:]]; !ok {
return errnotexist(name)
}
nd.Child = nil
nd.Watch = nil
for name, i = base(nd.Name), len(stack); i != 0; name, i = base(nd.Name), i-1 {
delete(nd.Child, name[i:])
for name, i = name[i:], len(stack); i != 0; name, i = base(nd.Name), i-1 {
nd = stack[i-1]
if nd := nd.Child[name]; len(nd.Watch) > 1 || len(nd.Child) != 0 {
break
@ -167,7 +167,7 @@ Traverse:
}
func (nd node) WalkPath(name string, fn walkPathFunc) error {
i := indexbase(nd.Name, name)
i := indexrel(nd.Name, name)
if i == -1 {
return errnotexist(name)
}

View File

@ -40,9 +40,9 @@ var defaultTree = newTree()
// watchpoint expands its event set. The only way to shrink it, is to call
// Stop on its channel.
//
// Calling Watch with empty event list does expand nor shrink watchpoint's event
// set. If c is the first channel to listen for events on the given path, Watch
// will seamlessly create a watch on the filesystem.
// Calling Watch with empty event list does not expand nor shrink watchpoint's
// event set. If c is the first channel to listen for events on the given path,
// Watch will seamlessly create a watch on the filesystem.
//
// Notify dispatches copies of single filesystem event to all channels registered
// for each path. If a single filesystem event contains multiple coalesced events,
@ -53,7 +53,7 @@ var defaultTree = newTree()
// dispatches two events - notify.Create and notify.Write. However, it may depend
// on the underlying watcher implementation whether OS reports both of them.
//
// Windows and recursive watches
// # Windows and recursive watches
//
// If a directory which path was used to create recursive watch under Windows
// gets deleted, the OS will not report such event. It is advised to keep in

View File

@ -65,7 +65,7 @@ func (t *nonrecursiveTree) dispatch(c <-chan EventInfo) {
}
t.rw.RUnlock()
// If the event describes newly leaf directory created within
if !isrec || ei.Event() != Create {
if !isrec || ei.Event()&(Create|Remove) == 0 {
return
}
if ok, err := ei.(isDirer).isDir(); !ok || err != nil {
@ -79,9 +79,23 @@ func (t *nonrecursiveTree) dispatch(c <-chan EventInfo) {
// internal TODO(rjeczalik)
func (t *nonrecursiveTree) internal(rec <-chan EventInfo) {
for ei := range rec {
t.rw.Lock()
if ei.Event() == Remove {
nd, err := t.root.Get(ei.Path())
if err != nil {
t.rw.Unlock()
continue
}
t.walkWatchpoint(nd, func(_ Event, nd node) error {
t.w.Unwatch(nd.Name)
return nil
})
t.root.Del(ei.Path())
t.rw.Unlock()
continue
}
var nd node
var eset = internal
t.rw.Lock()
t.root.WalkPath(ei.Path(), func(it node, _ bool) error {
if e := it.Watch[t.rec]; e != 0 && e > eset {
eset = e
@ -93,7 +107,10 @@ func (t *nonrecursiveTree) internal(rec <-chan EventInfo) {
t.rw.Unlock()
continue
}
err := nd.Add(ei.Path()).AddDir(t.recFunc(eset))
if ei.Path() != nd.Name {
nd = nd.Add(ei.Path())
}
err := nd.AddDir(t.recFunc(eset))
t.rw.Unlock()
if err != nil {
dbgprintf("internal(%p) error: %v", rec, err)

View File

@ -123,10 +123,12 @@ func base(s string) string {
return s
}
func indexbase(root, name string) int {
if n, m := len(root), len(name); m >= n && name[:n] == root &&
(n == m || name[n] == os.PathSeparator) {
return min(n+1, m)
// indexrel returns the index of the first char of name that is
// below/relative to root. It returns -1 if name is not a child of root.
func indexrel(root, name string) int {
if n, m := len(root), len(name); m > n && name[:n] == root &&
name[n] == os.PathSeparator {
return n + 1
}
return -1
}

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build solaris
//go:build solaris || illumos
// +build solaris illumos
package notify

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build solaris
//go:build solaris || illumos
// +build solaris illumos
package notify

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build darwin,!kqueue
//go:build darwin && !kqueue && cgo
// +build darwin,!kqueue,cgo
package notify
@ -83,7 +84,6 @@ type watch struct {
//
// ~ $ echo > file # Write|InodeMetaMod -> Write|InodeMetaMod
// ~ $ rm file # Remove|Write|InodeMetaMod -> Remove
//
func (w *watch) strip(base string, set uint32) uint32 {
const (
write = FSEventsModified | FSEventsInodeMetaMod
@ -129,7 +129,7 @@ func (w *watch) Dispatch(ev []FSEvent) {
}
dbgprintf("%v (0x%x) (%s, i=%d, ID=%d, len=%d)\n", Event(ev[i].Flags),
ev[i].Flags, ev[i].Path, i, ev[i].ID, len(ev))
if ev[i].Flags&failure != 0 {
if ev[i].Flags&failure != 0 && failure&events == 0 {
// TODO(rjeczalik): missing error handling
continue
}
@ -258,13 +258,13 @@ func (fse *fsevents) RecursiveUnwatch(path string) error {
return fse.unwatch(path)
}
// RecrusiveRewatch implements RecursiveWatcher interface. It fails:
// RecursiveRewatch implements RecursiveWatcher interface. It fails:
//
// * with errNotWatched when the given path is not being watched
// * with errInvalidEventSet when oldevent does not match the current event set
// * with errAlreadyWatched when watch-point given by the oldpath was meant to
// - with errNotWatched when the given path is not being watched
// - with errInvalidEventSet when oldevent does not match the current event set
// - with errAlreadyWatched when watch-point given by the oldpath was meant to
// be relocated to newpath, but the newpath is already watched
// * a non-nil error when setting the watch-point with FSEvents fails
// - a non-nil error when setting the watch-point with FSEvents fails
//
// TODO(rjeczalik): Improve handling of watch-point relocation? See two TODOs
// that follows.

View File

@ -2,16 +2,15 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build darwin,!kqueue
//go:build darwin && !kqueue && cgo
// +build darwin,!kqueue,cgo
package notify
/*
#include <CoreServices/CoreServices.h>
#include <dispatch/dispatch.h>
typedef void (*CFRunLoopPerformCallBack)(void*);
void gosource(void *);
void gostream(uintptr_t, uintptr_t, size_t, uintptr_t, uintptr_t, uintptr_t);
static FSEventStreamRef EventStreamCreate(FSEventStreamContext * context, uintptr_t info, CFArrayRef paths, FSEventStreamEventId since, CFTimeInterval latency, FSEventStreamCreateFlags flags) {
@ -26,7 +25,6 @@ import "C"
import (
"errors"
"os"
"runtime"
"sync"
"sync/atomic"
"unsafe"
@ -41,16 +39,11 @@ var (
since = uint64(C.FSEventsGetCurrentEventId())
)
var runloop C.CFRunLoopRef // global runloop which all streams are registered with
var wg sync.WaitGroup // used to wait until the runloop starts
// source is used for synchronization purposes - it signals when runloop has
// started and is ready via the wg. It also serves purpose of a dummy source,
// thanks to it the runloop does not return as it also has at least one source
// registered.
var source = C.CFRunLoopSourceCreate(C.kCFAllocatorDefault, 0, &C.CFRunLoopSourceContext{
perform: (C.CFRunLoopPerformCallBack)(C.gosource),
})
// global dispatch queue which all streams are registered with
var q C.dispatch_queue_t = C.dispatch_queue_create(
C.CString("com.github.rjeczalik.notify"),
(C.dispatch_queue_attr_t)(C.DISPATCH_QUEUE_SERIAL),
)
// Errors returned when FSEvents functions fail.
var (
@ -58,28 +51,6 @@ var (
errStart = os.NewSyscallError("FSEventStreamStart", errors.New("false"))
)
// initializes the global runloop and ensures any created stream awaits its
// readiness.
func init() {
wg.Add(1)
go func() {
// There is exactly one run loop per thread. Lock this goroutine to its
// thread to ensure that it's not rescheduled on a different thread while
// setting up the run loop.
runtime.LockOSThread()
runloop = C.CFRunLoopGetCurrent()
C.CFRunLoopAddSource(runloop, source, C.kCFRunLoopDefaultMode)
C.CFRunLoopRun()
panic("runloop has just unexpectedly stopped")
}()
C.CFRunLoopSourceSignal(source)
}
//export gosource
func gosource(unsafe.Pointer) {
wg.Done()
}
//export gostream
func gostream(_, info uintptr, n C.size_t, paths, flags, ids uintptr) {
const (
@ -142,8 +113,7 @@ func (r *streamFuncRegistry) delete(id uintptr) {
delete(r.m, id)
}
// Stream represents single watch-point which listens for events scheduled by
// the global runloop.
// Stream represents a single watch-point which listens for events scheduled on the global dispatch queue.
type stream struct {
path string
ref C.FSEventStreamRef
@ -159,13 +129,12 @@ func newStream(path string, fn streamFunc) *stream {
}
}
// Start creates a FSEventStream for the given path and schedules it with
// global runloop. It's a nop if the stream was already started.
// Start creates a FSEventStream for the given path and schedules on the global dispatch queue.
// It's a nop if the stream was already started.
func (s *stream) Start() error {
if s.ref != nilstream {
return nil
}
wg.Wait()
p := C.CFStringCreateWithCStringNoCopy(C.kCFAllocatorDefault, C.CString(s.path), C.kCFStringEncodingUTF8, C.kCFAllocatorDefault)
path := C.CFArrayCreate(C.kCFAllocatorDefault, (*unsafe.Pointer)(unsafe.Pointer(&p)), 1, nil)
ctx := C.FSEventStreamContext{}
@ -173,25 +142,22 @@ func (s *stream) Start() error {
if ref == nilstream {
return errCreate
}
C.FSEventStreamScheduleWithRunLoop(ref, runloop, C.kCFRunLoopDefaultMode)
C.FSEventStreamSetDispatchQueue(ref, q)
if C.FSEventStreamStart(ref) == C.Boolean(0) {
C.FSEventStreamInvalidate(ref)
return errStart
}
C.CFRunLoopWakeUp(runloop)
s.ref = ref
return nil
}
// Stop stops underlying FSEventStream and unregisters it from global runloop.
// Stop stops underlying FSEventStream and unregisters it from the global dispatch queue.
func (s *stream) Stop() {
if s.ref == nilstream {
return
}
wg.Wait()
C.FSEventStreamStop(s.ref)
C.FSEventStreamInvalidate(s.ref)
C.CFRunLoopWakeUp(runloop)
s.ref = nilstream
streamFuncs.delete(s.info)
}

View File

@ -2,6 +2,7 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build linux
// +build linux
package notify
@ -93,20 +94,14 @@ func (i *inotify) watch(path string, e Event) (err error) {
if err != nil {
return
}
i.RLock()
wd := i.m[int32(iwd)]
i.RUnlock()
if wd == nil {
i.Lock()
if i.m[int32(iwd)] == nil {
if wd, ok := i.m[int32(iwd)]; !ok {
i.m[int32(iwd)] = &watched{path: path, mask: uint32(e)}
}
i.Unlock()
} else {
i.Lock()
wd.path = path
wd.mask = uint32(e)
i.Unlock()
}
i.Unlock()
return nil
}

View File

@ -2,11 +2,13 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build darwin,kqueue dragonfly freebsd netbsd openbsd
//go:build (darwin && kqueue) || (darwin && !cgo) || dragonfly || freebsd || netbsd || openbsd
// +build darwin,kqueue darwin,!cgo dragonfly freebsd netbsd openbsd
package notify
import (
"errors"
"fmt"
"os"
"syscall"
@ -57,6 +59,17 @@ func (k *kq) Close() error {
func (*kq) NewWatched(p string, fi os.FileInfo) (*watched, error) {
fd, err := syscall.Open(p, syscall.O_NONBLOCK|syscall.O_RDONLY, 0)
if err != nil {
// BSDs can't open symlinks and return an error if the symlink
// cannot be followed - ignore it instead of failing. See e.g.
// https://github.com/libinotify-kqueue/libinotify-kqueue/blob/a822c8f1d75404fe3132f695a898dcd42fe8afbc/patches/freebsd11-O_SYMLINK.patch
if os.IsNotExist(err) && fi.Mode()&os.ModeSymlink == os.ModeSymlink {
return nil, errSkip
}
// FreeBSD can't open unix domain sockets and returns "operation not supported" error.
// Ignore it instead of failing.
if errors.Is(err, syscall.ENOTSUP) && fi.Mode()&os.ModeSocket == os.ModeSocket {
return nil, errSkip
}
return nil, err
}
return &watched{

View File

@ -2,8 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build !darwin,!linux,!freebsd,!dragonfly,!netbsd,!openbsd,!windows
// +build !kqueue,!solaris
//go:build !darwin && !linux && !freebsd && !dragonfly && !netbsd && !openbsd && !windows && !kqueue && !solaris && !illumos
// +build !darwin,!linux,!freebsd,!dragonfly,!netbsd,!openbsd,!windows,!kqueue,!solaris,!illumos
package notify

View File

@ -1,7 +1,8 @@
// Copyright (c) 2014-2018 The Notify Authors. All rights reserved.
// Copyright (c) 2014-2020 The Notify Authors. All rights reserved.
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build windows
// +build windows
package notify
@ -36,6 +37,7 @@ const (
// - bits 12-19 store File notify actions,
// - bits 20-27 store notify specific events and flags,
// - bits 28-31 store states which are used in loop's FSM.
//
// Constants below are used as masks to retrieve only specific filter parts.
const (
onlyNotifyChanges uint32 = 0x00000FFF
@ -358,12 +360,15 @@ func (r *readdcw) loop() {
continue
}
overEx := (*overlappedEx)(unsafe.Pointer(overlapped))
if n != 0 {
if overEx == nil || overEx.parent == nil {
dbgprintf("incomplete completion status transferred=%d, overlapped=%#v, key=%#b", n, overEx, key)
continue
} else if n != 0 {
r.loopevent(n, overEx)
}
if err = overEx.parent.readDirChanges(); err != nil {
// TODO: error handling
}
}
r.loopstate(overEx)
}
}
@ -584,7 +589,9 @@ func decode(filter, action uint32) (Event, Event) {
case syscall.FILE_ACTION_RENAMED_NEW_NAME:
return gensys(filter, Rename, FileActionRenamedNewName)
}
panic(`notify: cannot decode internal mask`)
dbgprintf("cannot decode internal mask: %d", action)
return 0, 0
}
// gensys decides whether the Windows action, system-independent event or both

View File

@ -2,7 +2,8 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
// +build darwin,kqueue dragonfly freebsd netbsd openbsd solaris
//go:build (darwin && kqueue) || (darwin && !cgo) || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
// +build darwin,kqueue darwin,!cgo dragonfly freebsd netbsd openbsd solaris illumos
// watcher_trigger is used for FEN and kqueue which behave similarly:
// only files and dirs can be watched directly, but not files inside dirs.
@ -151,6 +152,9 @@ func (t *trg) singlewatch(p string, e Event, direct mode, fi os.FileInfo) (err e
w, ok := t.pthLkp[p]
if !ok {
if w, err = t.t.NewWatched(p, fi); err != nil {
if err == errSkip {
err = nil
}
return
}
}

View File

@ -2,6 +2,7 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build !windows
// +build !windows
package notify

View File

@ -2,6 +2,7 @@
// Use of this source code is governed by the MIT license that can be
// found in the LICENSE file.
//go:build windows
// +build windows
package notify

View File

@ -6,12 +6,12 @@ import (
"math/rand"
"time"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio"
pb "github.com/berty/go-libp2p-rendezvous/pb"
pb "github.com/waku-org/go-libp2p-rendezvous/pb"
)
var (
@ -23,7 +23,6 @@ type RendezvousPoint interface {
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]Registration, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan Registration, error)
DiscoverSubscribe(ctx context.Context, ns string, serviceTypes []RendezvousSyncClient) (<-chan peer.AddrInfo, error)
}
type Registration struct {
@ -37,7 +36,6 @@ type RendezvousClient interface {
Unregister(ctx context.Context, ns string) error
Discover(ctx context.Context, ns string, limit int, cookie []byte) ([]peer.AddrInfo, []byte, error)
DiscoverAsync(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
DiscoverSubscribe(ctx context.Context, ns string) (<-chan peer.AddrInfo, error)
}
func NewRendezvousPoint(host host.Host, p peer.ID, opts ...RendezvousPointOption) RendezvousPoint {
@ -56,17 +54,16 @@ type rendezvousPoint struct {
p peer.ID
}
func NewRendezvousClient(host host.Host, rp peer.ID, sync ...RendezvousSyncClient) RendezvousClient {
return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp), sync...)
func NewRendezvousClient(host host.Host, rp peer.ID) RendezvousClient {
return NewRendezvousClientWithPoint(NewRendezvousPoint(host, rp))
}
func NewRendezvousClientWithPoint(rp RendezvousPoint, syncClientList ...RendezvousSyncClient) RendezvousClient {
return &rendezvousClient{rp: rp, syncClients: syncClientList}
func NewRendezvousClientWithPoint(rp RendezvousPoint) RendezvousClient {
return &rendezvousClient{rp: rp}
}
type rendezvousClient struct {
rp RendezvousPoint
syncClients []RendezvousSyncClient
}
func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
@ -76,8 +73,8 @@ func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (ti
}
defer s.Reset()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
r := pbio.NewDelimitedReader(s, inet.MessageSizeMax)
w := pbio.NewDelimitedWriter(s)
addrs := rp.addrFactory(rp.host.Addrs())
if len(addrs) == 0 {
@ -85,7 +82,13 @@ func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (ti
}
log.Debugf("advertising on `%s` with: %v", ns, addrs)
req := newRegisterMessage(ns, peer.AddrInfo{ID: rp.host.ID(), Addrs: addrs}, ttl)
privKey := rp.host.Peerstore().PrivKey(rp.host.ID())
req, err := newRegisterMessage(privKey, ns, peer.AddrInfo{ID: rp.host.ID(), Addrs: addrs}, ttl)
if err != nil {
return 0, err
}
err = w.WriteMsg(req)
if err != nil {
return 0, err
@ -107,7 +110,12 @@ func (rp *rendezvousPoint) Register(ctx context.Context, ns string, ttl int) (ti
return 0, RendezvousError{Status: status, Text: res.GetRegisterResponse().GetStatusText()}
}
return time.Duration(response.Ttl) * time.Second, nil
responseTTL := int64(0)
if response.Ttl != nil {
responseTTL = int64(*response.Ttl)
}
return time.Duration(responseTTL) * time.Second, nil
}
func (rc *rendezvousClient) Register(ctx context.Context, ns string, ttl int) (time.Duration, error) {
@ -163,7 +171,7 @@ func (rp *rendezvousPoint) Unregister(ctx context.Context, ns string) error {
}
defer s.Close()
w := ggio.NewDelimitedWriter(s)
w := pbio.NewDelimitedWriter(s)
req := newUnregisterMessage(ns, rp.host.ID())
return w.WriteMsg(req)
}
@ -179,13 +187,13 @@ func (rp *rendezvousPoint) Discover(ctx context.Context, ns string, limit int, c
}
defer s.Reset()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
r := pbio.NewDelimitedReader(s, inet.MessageSizeMax)
w := pbio.NewDelimitedWriter(s)
return discoverQuery(ns, limit, cookie, r, w)
}
func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Writer) ([]Registration, []byte, error) {
func discoverQuery(ns string, limit int, cookie []byte, r pbio.Reader, w pbio.Writer) ([]Registration, []byte, error) {
req := newDiscoverMessage(ns, limit, cookie)
err := w.WriteMsg(req)
if err != nil {
@ -199,7 +207,7 @@ func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Wr
}
if res.GetType() != pb.Message_DISCOVER_RESPONSE {
return nil, nil, fmt.Errorf("Unexpected response: %s", res.GetType().String())
return nil, nil, fmt.Errorf("unexpected response: %s", res.GetType().String())
}
status := res.GetDiscoverResponse().GetStatus()
@ -210,7 +218,7 @@ func discoverQuery(ns string, limit int, cookie []byte, r ggio.Reader, w ggio.Wr
regs := res.GetDiscoverResponse().GetRegistrations()
result := make([]Registration, 0, len(regs))
for _, reg := range regs {
pi, err := pbToPeerInfo(reg.GetPeer())
pi, err := pbToPeerRecord(reg.SignedPeerRecord)
if err != nil {
log.Errorf("Invalid peer info: %s", err.Error())
continue
@ -236,8 +244,8 @@ func discoverAsync(ctx context.Context, ns string, s inet.Stream, ch chan Regist
defer s.Reset()
defer close(ch)
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
r := pbio.NewDelimitedReader(s, inet.MessageSizeMax)
w := pbio.NewDelimitedWriter(s)
const batch = 200
@ -320,99 +328,3 @@ func discoverPeersAsync(ctx context.Context, rch <-chan Registration, ch chan pe
}
}
}
func (rc *rendezvousClient) DiscoverSubscribe(ctx context.Context, ns string) (<-chan peer.AddrInfo, error) {
return rc.rp.DiscoverSubscribe(ctx, ns, rc.syncClients)
}
func subscribeServiceTypes(serviceTypeClients []RendezvousSyncClient) []string {
serviceTypes := []string(nil)
for _, serviceType := range serviceTypeClients {
serviceTypes = append(serviceTypes, serviceType.GetServiceType())
}
return serviceTypes
}
func (rp *rendezvousPoint) DiscoverSubscribe(ctx context.Context, ns string, serviceTypeClients []RendezvousSyncClient) (<-chan peer.AddrInfo, error) {
serviceTypes := subscribeServiceTypes(serviceTypeClients)
s, err := rp.host.NewStream(ctx, rp.p, RendezvousProto)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
subType, subDetails, err := discoverSubscribeQuery(ns, serviceTypes, r, w)
if err != nil {
return nil, fmt.Errorf("discover subscribe error: %w", err)
}
subClient := RendezvousSyncClient(nil)
for _, subClient = range serviceTypeClients {
if subClient.GetServiceType() == subType {
break
}
}
if subClient == nil {
return nil, fmt.Errorf("unrecognized client type")
}
regCh, err := subClient.Subscribe(ctx, subDetails)
if err != nil {
return nil, fmt.Errorf("unable to subscribe to updates: %w", err)
}
ch := make(chan peer.AddrInfo)
go func() {
defer close(ch)
for {
select {
case result, ok := <-regCh:
if !ok {
return
}
ch <- result.Peer
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
func discoverSubscribeQuery(ns string, serviceTypes []string, r ggio.Reader, w ggio.Writer) (subType string, subDetails string, err error) {
req := &pb.Message{
Type: pb.Message_DISCOVER_SUBSCRIBE,
DiscoverSubscribe: newDiscoverSubscribeMessage(ns, serviceTypes),
}
err = w.WriteMsg(req)
if err != nil {
return "", "", fmt.Errorf("write err: %w", err)
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
return "", "", fmt.Errorf("read err: %w", err)
}
if res.GetType() != pb.Message_DISCOVER_SUBSCRIBE_RESPONSE {
return "", "", fmt.Errorf("unexpected response: %s", res.GetType().String())
}
status := res.GetDiscoverSubscribeResponse().GetStatus()
if status != pb.Message_OK {
return "", "", RendezvousError{Status: status, Text: res.GetDiscoverSubscribeResponse().GetStatusText()}
}
subType = res.GetDiscoverSubscribeResponse().GetSubscriptionType()
subDetails = res.GetDiscoverSubscribeResponse().GetSubscriptionDetails()
return subType, subDetails, nil
}

View File

@ -6,14 +6,14 @@ import (
type RegistrationRecord struct {
Id peer.ID
Addrs [][]byte
SignedPeerRecord []byte
Ns string
Ttl int
}
type DB interface {
Close() error
Register(p peer.ID, ns string, addrs [][]byte, ttl int) (uint64, error)
Register(p peer.ID, ns string, signedPeerRecord []byte, ttl int) (uint64, error)
Unregister(p peer.ID, ns string) error
CountRegistrations(p peer.ID) (int, error)
Discover(ns string, cookie []byte, limit int) ([]RegistrationRecord, []byte, error)

View File

@ -21,12 +21,12 @@ type rendezvousDiscovery struct {
}
type discoveryCache struct {
recs map[peer.ID]*record
recs map[peer.ID]*peerRecord
cookie []byte
mux sync.Mutex
}
type record struct {
type peerRecord struct {
peer peer.AddrInfo
expire int64
}
@ -84,7 +84,7 @@ func (c *rendezvousDiscovery) FindPeers(ctx context.Context, ns string, opts ...
c.peerCacheMux.Lock()
cache, ok = c.peerCache[ns]
if !ok {
cache = &discoveryCache{recs: make(map[peer.ID]*record)}
cache = &discoveryCache{recs: make(map[peer.ID]*peerRecord)}
c.peerCache[ns] = cache
}
c.peerCacheMux.Unlock()
@ -114,7 +114,7 @@ func (c *rendezvousDiscovery) FindPeers(ctx context.Context, ns string, opts ...
var newCookie []byte
if regs, newCookie, err = c.rp.Discover(ctx, ns, limit, cookie); err == nil {
for _, reg := range regs {
rec := &record{peer: reg.Peer, expire: int64(reg.Ttl) + currentTime}
rec := &peerRecord{peer: reg.Peer, expire: int64(reg.Ttl) + currentTime}
cache.recs[rec.peer.ID] = rec
}
cache.cookie = newCookie

View File

@ -0,0 +1,3 @@
package rendezvous_pb
//go:generate protoc -I. --proto_path=./ --go_opt=paths=source_relative --go_opt=Mrendezvous.proto=github.com/waku-org/go-libp2p-rendezvous/rendezvous_pb --go_out=. ./rendezvous.proto

View File

@ -0,0 +1,782 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.12
// source: rendezvous.proto
package rendezvous_pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Message_MessageType int32
const (
Message_REGISTER Message_MessageType = 0
Message_REGISTER_RESPONSE Message_MessageType = 1
Message_UNREGISTER Message_MessageType = 2
Message_DISCOVER Message_MessageType = 3
Message_DISCOVER_RESPONSE Message_MessageType = 4
)
// Enum value maps for Message_MessageType.
var (
Message_MessageType_name = map[int32]string{
0: "REGISTER",
1: "REGISTER_RESPONSE",
2: "UNREGISTER",
3: "DISCOVER",
4: "DISCOVER_RESPONSE",
}
Message_MessageType_value = map[string]int32{
"REGISTER": 0,
"REGISTER_RESPONSE": 1,
"UNREGISTER": 2,
"DISCOVER": 3,
"DISCOVER_RESPONSE": 4,
}
)
func (x Message_MessageType) Enum() *Message_MessageType {
p := new(Message_MessageType)
*p = x
return p
}
func (x Message_MessageType) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (Message_MessageType) Descriptor() protoreflect.EnumDescriptor {
return file_rendezvous_proto_enumTypes[0].Descriptor()
}
func (Message_MessageType) Type() protoreflect.EnumType {
return &file_rendezvous_proto_enumTypes[0]
}
func (x Message_MessageType) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Do not use.
func (x *Message_MessageType) UnmarshalJSON(b []byte) error {
num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b)
if err != nil {
return err
}
*x = Message_MessageType(num)
return nil
}
// Deprecated: Use Message_MessageType.Descriptor instead.
func (Message_MessageType) EnumDescriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0, 0}
}
type Message_ResponseStatus int32
const (
Message_OK Message_ResponseStatus = 0
Message_E_INVALID_NAMESPACE Message_ResponseStatus = 100
Message_E_INVALID_SIGNED_PEER_RECORD Message_ResponseStatus = 101
Message_E_INVALID_TTL Message_ResponseStatus = 102
Message_E_INVALID_COOKIE Message_ResponseStatus = 103
Message_E_NOT_AUTHORIZED Message_ResponseStatus = 200
Message_E_INTERNAL_ERROR Message_ResponseStatus = 300
Message_E_UNAVAILABLE Message_ResponseStatus = 400
)
// Enum value maps for Message_ResponseStatus.
var (
Message_ResponseStatus_name = map[int32]string{
0: "OK",
100: "E_INVALID_NAMESPACE",
101: "E_INVALID_SIGNED_PEER_RECORD",
102: "E_INVALID_TTL",
103: "E_INVALID_COOKIE",
200: "E_NOT_AUTHORIZED",
300: "E_INTERNAL_ERROR",
400: "E_UNAVAILABLE",
}
Message_ResponseStatus_value = map[string]int32{
"OK": 0,
"E_INVALID_NAMESPACE": 100,
"E_INVALID_SIGNED_PEER_RECORD": 101,
"E_INVALID_TTL": 102,
"E_INVALID_COOKIE": 103,
"E_NOT_AUTHORIZED": 200,
"E_INTERNAL_ERROR": 300,
"E_UNAVAILABLE": 400,
}
)
func (x Message_ResponseStatus) Enum() *Message_ResponseStatus {
p := new(Message_ResponseStatus)
*p = x
return p
}
func (x Message_ResponseStatus) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (Message_ResponseStatus) Descriptor() protoreflect.EnumDescriptor {
return file_rendezvous_proto_enumTypes[1].Descriptor()
}
func (Message_ResponseStatus) Type() protoreflect.EnumType {
return &file_rendezvous_proto_enumTypes[1]
}
func (x Message_ResponseStatus) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Do not use.
func (x *Message_ResponseStatus) UnmarshalJSON(b []byte) error {
num, err := protoimpl.X.UnmarshalJSONEnum(x.Descriptor(), b)
if err != nil {
return err
}
*x = Message_ResponseStatus(num)
return nil
}
// Deprecated: Use Message_ResponseStatus.Descriptor instead.
func (Message_ResponseStatus) EnumDescriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0, 1}
}
type Message struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=rendezvous.pb.Message_MessageType" json:"type,omitempty"`
Register *Message_Register `protobuf:"bytes,2,opt,name=register" json:"register,omitempty"`
RegisterResponse *Message_RegisterResponse `protobuf:"bytes,3,opt,name=registerResponse" json:"registerResponse,omitempty"`
Unregister *Message_Unregister `protobuf:"bytes,4,opt,name=unregister" json:"unregister,omitempty"`
Discover *Message_Discover `protobuf:"bytes,5,opt,name=discover" json:"discover,omitempty"`
DiscoverResponse *Message_DiscoverResponse `protobuf:"bytes,6,opt,name=discoverResponse" json:"discoverResponse,omitempty"`
}
func (x *Message) Reset() {
*x = Message{}
if protoimpl.UnsafeEnabled {
mi := &file_rendezvous_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Message) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message) ProtoMessage() {}
func (x *Message) ProtoReflect() protoreflect.Message {
mi := &file_rendezvous_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message.ProtoReflect.Descriptor instead.
func (*Message) Descriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0}
}
func (x *Message) GetType() Message_MessageType {
if x != nil && x.Type != nil {
return *x.Type
}
return Message_REGISTER
}
func (x *Message) GetRegister() *Message_Register {
if x != nil {
return x.Register
}
return nil
}
func (x *Message) GetRegisterResponse() *Message_RegisterResponse {
if x != nil {
return x.RegisterResponse
}
return nil
}
func (x *Message) GetUnregister() *Message_Unregister {
if x != nil {
return x.Unregister
}
return nil
}
func (x *Message) GetDiscover() *Message_Discover {
if x != nil {
return x.Discover
}
return nil
}
func (x *Message) GetDiscoverResponse() *Message_DiscoverResponse {
if x != nil {
return x.DiscoverResponse
}
return nil
}
type Message_Register struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Ns *string `protobuf:"bytes,1,opt,name=ns" json:"ns,omitempty"`
SignedPeerRecord []byte `protobuf:"bytes,2,opt,name=signedPeerRecord" json:"signedPeerRecord,omitempty"`
Ttl *uint64 `protobuf:"varint,3,opt,name=ttl" json:"ttl,omitempty"` // in seconds
}
func (x *Message_Register) Reset() {
*x = Message_Register{}
if protoimpl.UnsafeEnabled {
mi := &file_rendezvous_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Message_Register) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message_Register) ProtoMessage() {}
func (x *Message_Register) ProtoReflect() protoreflect.Message {
mi := &file_rendezvous_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message_Register.ProtoReflect.Descriptor instead.
func (*Message_Register) Descriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0, 0}
}
func (x *Message_Register) GetNs() string {
if x != nil && x.Ns != nil {
return *x.Ns
}
return ""
}
func (x *Message_Register) GetSignedPeerRecord() []byte {
if x != nil {
return x.SignedPeerRecord
}
return nil
}
func (x *Message_Register) GetTtl() uint64 {
if x != nil && x.Ttl != nil {
return *x.Ttl
}
return 0
}
type Message_RegisterResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Status *Message_ResponseStatus `protobuf:"varint,1,opt,name=status,enum=rendezvous.pb.Message_ResponseStatus" json:"status,omitempty"`
StatusText *string `protobuf:"bytes,2,opt,name=statusText" json:"statusText,omitempty"`
Ttl *uint64 `protobuf:"varint,3,opt,name=ttl" json:"ttl,omitempty"` // in seconds
}
func (x *Message_RegisterResponse) Reset() {
*x = Message_RegisterResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_rendezvous_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Message_RegisterResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message_RegisterResponse) ProtoMessage() {}
func (x *Message_RegisterResponse) ProtoReflect() protoreflect.Message {
mi := &file_rendezvous_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message_RegisterResponse.ProtoReflect.Descriptor instead.
func (*Message_RegisterResponse) Descriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0, 1}
}
func (x *Message_RegisterResponse) GetStatus() Message_ResponseStatus {
if x != nil && x.Status != nil {
return *x.Status
}
return Message_OK
}
func (x *Message_RegisterResponse) GetStatusText() string {
if x != nil && x.StatusText != nil {
return *x.StatusText
}
return ""
}
func (x *Message_RegisterResponse) GetTtl() uint64 {
if x != nil && x.Ttl != nil {
return *x.Ttl
}
return 0
}
type Message_Unregister struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Ns *string `protobuf:"bytes,1,opt,name=ns" json:"ns,omitempty"` // optional bytes id = 2; deprecated as per https://github.com/libp2p/specs/issues/335
}
func (x *Message_Unregister) Reset() {
*x = Message_Unregister{}
if protoimpl.UnsafeEnabled {
mi := &file_rendezvous_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Message_Unregister) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message_Unregister) ProtoMessage() {}
func (x *Message_Unregister) ProtoReflect() protoreflect.Message {
mi := &file_rendezvous_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message_Unregister.ProtoReflect.Descriptor instead.
func (*Message_Unregister) Descriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0, 2}
}
func (x *Message_Unregister) GetNs() string {
if x != nil && x.Ns != nil {
return *x.Ns
}
return ""
}
type Message_Discover struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Ns *string `protobuf:"bytes,1,opt,name=ns" json:"ns,omitempty"`
Limit *uint64 `protobuf:"varint,2,opt,name=limit" json:"limit,omitempty"`
Cookie []byte `protobuf:"bytes,3,opt,name=cookie" json:"cookie,omitempty"`
}
func (x *Message_Discover) Reset() {
*x = Message_Discover{}
if protoimpl.UnsafeEnabled {
mi := &file_rendezvous_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Message_Discover) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message_Discover) ProtoMessage() {}
func (x *Message_Discover) ProtoReflect() protoreflect.Message {
mi := &file_rendezvous_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message_Discover.ProtoReflect.Descriptor instead.
func (*Message_Discover) Descriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0, 3}
}
func (x *Message_Discover) GetNs() string {
if x != nil && x.Ns != nil {
return *x.Ns
}
return ""
}
func (x *Message_Discover) GetLimit() uint64 {
if x != nil && x.Limit != nil {
return *x.Limit
}
return 0
}
func (x *Message_Discover) GetCookie() []byte {
if x != nil {
return x.Cookie
}
return nil
}
type Message_DiscoverResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Registrations []*Message_Register `protobuf:"bytes,1,rep,name=registrations" json:"registrations,omitempty"`
Cookie []byte `protobuf:"bytes,2,opt,name=cookie" json:"cookie,omitempty"`
Status *Message_ResponseStatus `protobuf:"varint,3,opt,name=status,enum=rendezvous.pb.Message_ResponseStatus" json:"status,omitempty"`
StatusText *string `protobuf:"bytes,4,opt,name=statusText" json:"statusText,omitempty"`
}
func (x *Message_DiscoverResponse) Reset() {
*x = Message_DiscoverResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_rendezvous_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Message_DiscoverResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Message_DiscoverResponse) ProtoMessage() {}
func (x *Message_DiscoverResponse) ProtoReflect() protoreflect.Message {
mi := &file_rendezvous_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Message_DiscoverResponse.ProtoReflect.Descriptor instead.
func (*Message_DiscoverResponse) Descriptor() ([]byte, []int) {
return file_rendezvous_proto_rawDescGZIP(), []int{0, 4}
}
func (x *Message_DiscoverResponse) GetRegistrations() []*Message_Register {
if x != nil {
return x.Registrations
}
return nil
}
func (x *Message_DiscoverResponse) GetCookie() []byte {
if x != nil {
return x.Cookie
}
return nil
}
func (x *Message_DiscoverResponse) GetStatus() Message_ResponseStatus {
if x != nil && x.Status != nil {
return *x.Status
}
return Message_OK
}
func (x *Message_DiscoverResponse) GetStatusText() string {
if x != nil && x.StatusText != nil {
return *x.StatusText
}
return ""
}
var File_rendezvous_proto protoreflect.FileDescriptor
var file_rendezvous_proto_rawDesc = []byte{
0x0a, 0x10, 0x72, 0x65, 0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x0d, 0x72, 0x65, 0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70,
0x62, 0x22, 0xed, 0x09, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x36, 0x0a,
0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x22, 0x2e, 0x72, 0x65,
0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52,
0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x3b, 0x0a, 0x08, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65,
0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x72, 0x65, 0x6e, 0x64, 0x65, 0x7a,
0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e,
0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x08, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74,
0x65, 0x72, 0x12, 0x53, 0x0a, 0x10, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x72,
0x65, 0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x10, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x0a, 0x75, 0x6e, 0x72, 0x65, 0x67,
0x69, 0x73, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x72, 0x65,
0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x2e, 0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x0a,
0x75, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x3b, 0x0a, 0x08, 0x64, 0x69,
0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x72,
0x65, 0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x52, 0x08, 0x64,
0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x12, 0x53, 0x0a, 0x10, 0x64, 0x69, 0x73, 0x63, 0x6f,
0x76, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x27, 0x2e, 0x72, 0x65, 0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70,
0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76,
0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x10, 0x64, 0x69, 0x73, 0x63,
0x6f, 0x76, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x58, 0x0a, 0x08,
0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x6e, 0x73, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x6e, 0x73, 0x12, 0x2a, 0x0a, 0x10, 0x73, 0x69, 0x67, 0x6e,
0x65, 0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0c, 0x52, 0x10, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65,
0x63, 0x6f, 0x72, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x74, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28,
0x04, 0x52, 0x03, 0x74, 0x74, 0x6c, 0x1a, 0x83, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73,
0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3d, 0x0a, 0x06, 0x73,
0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x72, 0x65,
0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x53, 0x74, 0x61, 0x74,
0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74,
0x61, 0x74, 0x75, 0x73, 0x54, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a,
0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x54, 0x65, 0x78, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x74,
0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x74, 0x74, 0x6c, 0x1a, 0x1c, 0x0a, 0x0a,
0x55, 0x6e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x6e, 0x73,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x6e, 0x73, 0x1a, 0x48, 0x0a, 0x08, 0x44, 0x69,
0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x02, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x02, 0x6e, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x16, 0x0a, 0x06,
0x63, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x63, 0x6f,
0x6f, 0x6b, 0x69, 0x65, 0x1a, 0xd0, 0x01, 0x0a, 0x10, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65,
0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x0d, 0x72, 0x65, 0x67,
0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x1f, 0x2e, 0x72, 0x65, 0x6e, 0x64, 0x65, 0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62,
0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65,
0x72, 0x52, 0x0d, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x06, 0x63, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x3d, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74,
0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x25, 0x2e, 0x72, 0x65, 0x6e, 0x64, 0x65,
0x7a, 0x76, 0x6f, 0x75, 0x73, 0x2e, 0x70, 0x62, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52,
0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x75,
0x73, 0x54, 0x65, 0x78, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x61,
0x74, 0x75, 0x73, 0x54, 0x65, 0x78, 0x74, 0x22, 0x67, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x45, 0x47, 0x49, 0x53, 0x54,
0x45, 0x52, 0x10, 0x00, 0x12, 0x15, 0x0a, 0x11, 0x52, 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, 0x52,
0x5f, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, 0x45, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x55,
0x4e, 0x52, 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, 0x52, 0x10, 0x02, 0x12, 0x0c, 0x0a, 0x08, 0x44,
0x49, 0x53, 0x43, 0x4f, 0x56, 0x45, 0x52, 0x10, 0x03, 0x12, 0x15, 0x0a, 0x11, 0x44, 0x49, 0x53,
0x43, 0x4f, 0x56, 0x45, 0x52, 0x5f, 0x52, 0x45, 0x53, 0x50, 0x4f, 0x4e, 0x53, 0x45, 0x10, 0x04,
0x22, 0xbe, 0x01, 0x0a, 0x0e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x45,
0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x4e, 0x41, 0x4d, 0x45, 0x53, 0x50, 0x41,
0x43, 0x45, 0x10, 0x64, 0x12, 0x20, 0x0a, 0x1c, 0x45, 0x5f, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49,
0x44, 0x5f, 0x53, 0x49, 0x47, 0x4e, 0x45, 0x44, 0x5f, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x52, 0x45,
0x43, 0x4f, 0x52, 0x44, 0x10, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x45, 0x5f, 0x49, 0x4e, 0x56, 0x41,
0x4c, 0x49, 0x44, 0x5f, 0x54, 0x54, 0x4c, 0x10, 0x66, 0x12, 0x14, 0x0a, 0x10, 0x45, 0x5f, 0x49,
0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x5f, 0x43, 0x4f, 0x4f, 0x4b, 0x49, 0x45, 0x10, 0x67, 0x12,
0x15, 0x0a, 0x10, 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x5f, 0x41, 0x55, 0x54, 0x48, 0x4f, 0x52, 0x49,
0x5a, 0x45, 0x44, 0x10, 0xc8, 0x01, 0x12, 0x15, 0x0a, 0x10, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x45,
0x52, 0x4e, 0x41, 0x4c, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0xac, 0x02, 0x12, 0x12, 0x0a,
0x0d, 0x45, 0x5f, 0x55, 0x4e, 0x41, 0x56, 0x41, 0x49, 0x4c, 0x41, 0x42, 0x4c, 0x45, 0x10, 0x90,
0x03,
}
var (
file_rendezvous_proto_rawDescOnce sync.Once
file_rendezvous_proto_rawDescData = file_rendezvous_proto_rawDesc
)
func file_rendezvous_proto_rawDescGZIP() []byte {
file_rendezvous_proto_rawDescOnce.Do(func() {
file_rendezvous_proto_rawDescData = protoimpl.X.CompressGZIP(file_rendezvous_proto_rawDescData)
})
return file_rendezvous_proto_rawDescData
}
var file_rendezvous_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_rendezvous_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_rendezvous_proto_goTypes = []interface{}{
(Message_MessageType)(0), // 0: rendezvous.pb.Message.MessageType
(Message_ResponseStatus)(0), // 1: rendezvous.pb.Message.ResponseStatus
(*Message)(nil), // 2: rendezvous.pb.Message
(*Message_Register)(nil), // 3: rendezvous.pb.Message.Register
(*Message_RegisterResponse)(nil), // 4: rendezvous.pb.Message.RegisterResponse
(*Message_Unregister)(nil), // 5: rendezvous.pb.Message.Unregister
(*Message_Discover)(nil), // 6: rendezvous.pb.Message.Discover
(*Message_DiscoverResponse)(nil), // 7: rendezvous.pb.Message.DiscoverResponse
}
var file_rendezvous_proto_depIdxs = []int32{
0, // 0: rendezvous.pb.Message.type:type_name -> rendezvous.pb.Message.MessageType
3, // 1: rendezvous.pb.Message.register:type_name -> rendezvous.pb.Message.Register
4, // 2: rendezvous.pb.Message.registerResponse:type_name -> rendezvous.pb.Message.RegisterResponse
5, // 3: rendezvous.pb.Message.unregister:type_name -> rendezvous.pb.Message.Unregister
6, // 4: rendezvous.pb.Message.discover:type_name -> rendezvous.pb.Message.Discover
7, // 5: rendezvous.pb.Message.discoverResponse:type_name -> rendezvous.pb.Message.DiscoverResponse
1, // 6: rendezvous.pb.Message.RegisterResponse.status:type_name -> rendezvous.pb.Message.ResponseStatus
3, // 7: rendezvous.pb.Message.DiscoverResponse.registrations:type_name -> rendezvous.pb.Message.Register
1, // 8: rendezvous.pb.Message.DiscoverResponse.status:type_name -> rendezvous.pb.Message.ResponseStatus
9, // [9:9] is the sub-list for method output_type
9, // [9:9] is the sub-list for method input_type
9, // [9:9] is the sub-list for extension type_name
9, // [9:9] is the sub-list for extension extendee
0, // [0:9] is the sub-list for field type_name
}
func init() { file_rendezvous_proto_init() }
func file_rendezvous_proto_init() {
if File_rendezvous_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_rendezvous_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_rendezvous_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message_Register); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_rendezvous_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message_RegisterResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_rendezvous_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message_Unregister); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_rendezvous_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message_Discover); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_rendezvous_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message_DiscoverResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_rendezvous_proto_rawDesc,
NumEnums: 2,
NumMessages: 6,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_rendezvous_proto_goTypes,
DependencyIndexes: file_rendezvous_proto_depIdxs,
EnumInfos: file_rendezvous_proto_enumTypes,
MessageInfos: file_rendezvous_proto_msgTypes,
}.Build()
File_rendezvous_proto = out.File
file_rendezvous_proto_rawDesc = nil
file_rendezvous_proto_goTypes = nil
file_rendezvous_proto_depIdxs = nil
}

View File

@ -0,0 +1,61 @@
syntax = "proto2";
package rendezvous.pb;
message Message {
enum MessageType {
REGISTER = 0;
REGISTER_RESPONSE = 1;
UNREGISTER = 2;
DISCOVER = 3;
DISCOVER_RESPONSE = 4;
}
enum ResponseStatus {
OK = 0;
E_INVALID_NAMESPACE = 100;
E_INVALID_SIGNED_PEER_RECORD = 101;
E_INVALID_TTL = 102;
E_INVALID_COOKIE = 103;
E_NOT_AUTHORIZED = 200;
E_INTERNAL_ERROR = 300;
E_UNAVAILABLE = 400;
}
message Register {
optional string ns = 1;
optional bytes signedPeerRecord = 2;
optional uint64 ttl = 3; // in seconds
}
message RegisterResponse {
optional ResponseStatus status = 1;
optional string statusText = 2;
optional uint64 ttl = 3; // in seconds
}
message Unregister {
optional string ns = 1;
// optional bytes id = 2; deprecated as per https://github.com/libp2p/specs/issues/335
}
message Discover {
optional string ns = 1;
optional uint64 limit = 2;
optional bytes cookie = 3;
}
message DiscoverResponse {
repeated Register registrations = 1;
optional bytes cookie = 2;
optional ResponseStatus status = 3;
optional string statusText = 4;
}
optional MessageType type = 1;
optional Register register = 2;
optional RegisterResponse registerResponse = 3;
optional Unregister unregister = 4;
optional Discover discover = 5;
optional DiscoverResponse discoverResponse = 6;
}

View File

@ -0,0 +1,160 @@
package rendezvous
import (
"errors"
"fmt"
"time"
db "github.com/waku-org/go-libp2p-rendezvous/db"
pb "github.com/waku-org/go-libp2p-rendezvous/pb"
logging "github.com/ipfs/go-log/v2"
crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
)
var log = logging.Logger("rendezvous")
const (
RendezvousProto = protocol.ID("/rendezvous/1.0.0")
DefaultTTL = 2 * 3600 // 2hr
)
type RendezvousError struct {
Status pb.Message_ResponseStatus
Text string
}
func (e RendezvousError) Error() string {
return fmt.Sprintf("Rendezvous error: %s (%s)", e.Text, e.Status.String())
}
func NewRegisterMessage(privKey crypto.PrivKey, ns string, pi peer.AddrInfo, ttl int) (*pb.Message, error) {
return newRegisterMessage(privKey, ns, pi, ttl)
}
func newRegisterMessage(privKey crypto.PrivKey, ns string, pi peer.AddrInfo, ttl int) (*pb.Message, error) {
msg := new(pb.Message)
msg.Type = pb.Message_REGISTER.Enum()
msg.Register = new(pb.Message_Register)
if ns != "" {
msg.Register.Ns = &ns
}
if ttl > 0 {
ttlu64 := uint64(ttl)
msg.Register.Ttl = &ttlu64
}
peerInfo := &peer.PeerRecord{
PeerID: pi.ID,
Addrs: pi.Addrs,
Seq: uint64(time.Now().Unix()),
}
envelope, err := record.Seal(peerInfo, privKey)
if err != nil {
return nil, err
}
envPayload, err := envelope.Marshal()
if err != nil {
return nil, err
}
msg.Register.SignedPeerRecord = envPayload
return msg, nil
}
func newUnregisterMessage(ns string, pid peer.ID) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_UNREGISTER.Enum()
msg.Unregister = new(pb.Message_Unregister)
if ns != "" {
msg.Unregister.Ns = &ns
}
return msg
}
func NewDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
return newDiscoverMessage(ns, limit, cookie)
}
func newDiscoverMessage(ns string, limit int, cookie []byte) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_DISCOVER.Enum()
msg.Discover = new(pb.Message_Discover)
if ns != "" {
msg.Discover.Ns = &ns
}
if limit > 0 {
limitu64 := uint64(limit)
msg.Discover.Limit = &limitu64
}
if cookie != nil {
msg.Discover.Cookie = cookie
}
return msg
}
func pbToPeerRecord(envelopeBytes []byte) (peer.AddrInfo, error) {
envelope, rec, err := record.ConsumeEnvelope(envelopeBytes, peer.PeerRecordEnvelopeDomain)
if err != nil {
return peer.AddrInfo{}, err
}
peerRec, ok := rec.(*peer.PeerRecord)
if !ok {
return peer.AddrInfo{}, errors.New("invalid peer record")
}
if !peerRec.PeerID.MatchesPublicKey(envelope.PublicKey) {
return peer.AddrInfo{}, errors.New("signing key does not match peer record")
}
return peer.AddrInfo{ID: peerRec.PeerID, Addrs: peerRec.Addrs}, nil
}
func newRegisterResponse(ttl int) *pb.Message_RegisterResponse {
ttlu64 := uint64(ttl)
r := new(pb.Message_RegisterResponse)
r.Status = pb.Message_OK.Enum()
r.Ttl = &ttlu64
return r
}
func newRegisterResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_RegisterResponse {
r := new(pb.Message_RegisterResponse)
r.Status = status.Enum()
r.StatusText = &text
return r
}
func newDiscoverResponse(regs []db.RegistrationRecord, cookie []byte) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = pb.Message_OK.Enum()
rregs := make([]*pb.Message_Register, len(regs))
for i, reg := range regs {
rreg := new(pb.Message_Register)
rreg.Ns = &reg.Ns
rreg.SignedPeerRecord = reg.SignedPeerRecord
rttl := uint64(reg.Ttl)
rreg.Ttl = &rttl
rregs[i] = rreg
}
r.Registrations = rregs
r.Cookie = cookie
return r
}
func newDiscoverResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DiscoverResponse {
r := new(pb.Message_DiscoverResponse)
r.Status = status.Enum()
r.StatusText = &text
return r
}

View File

@ -1,15 +1,13 @@
package rendezvous
import (
"fmt"
ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p/core/host"
inet "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/pbio"
db "github.com/berty/go-libp2p-rendezvous/db"
pb "github.com/berty/go-libp2p-rendezvous/pb"
db "github.com/waku-org/go-libp2p-rendezvous/db"
pb "github.com/waku-org/go-libp2p-rendezvous/pb"
)
const (
@ -22,11 +20,10 @@ const (
type RendezvousService struct {
DB db.DB
rzs []RendezvousSync
}
func NewRendezvousService(host host.Host, db db.DB, rzs ...RendezvousSync) *RendezvousService {
rz := &RendezvousService{DB: db, rzs: rzs}
func NewRendezvousService(host host.Host, db db.DB) *RendezvousService {
rz := &RendezvousService{DB: db}
host.SetStreamHandler(RendezvousProto, rz.handleStream)
return rz
}
@ -37,8 +34,8 @@ func (rz *RendezvousService) handleStream(s inet.Stream) {
pid := s.Conn().RemotePeer()
log.Debugf("New stream from %s", pid.Pretty())
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
r := pbio.NewDelimitedReader(s, inet.MessageSizeMax)
w := pbio.NewDelimitedWriter(s)
for {
var req pb.Message
@ -53,7 +50,7 @@ func (rz *RendezvousService) handleStream(s inet.Stream) {
switch t {
case pb.Message_REGISTER:
r := rz.handleRegister(pid, req.GetRegister())
res.Type = pb.Message_REGISTER_RESPONSE
res.Type = pb.Message_REGISTER_RESPONSE.Enum()
res.RegisterResponse = r
err = w.WriteMsg(&res)
if err != nil {
@ -69,7 +66,7 @@ func (rz *RendezvousService) handleStream(s inet.Stream) {
case pb.Message_DISCOVER:
r := rz.handleDiscover(pid, req.GetDiscover())
res.Type = pb.Message_DISCOVER_RESPONSE
res.Type = pb.Message_DISCOVER_RESPONSE.Enum()
res.DiscoverResponse = r
err = w.WriteMsg(&res)
if err != nil {
@ -77,16 +74,6 @@ func (rz *RendezvousService) handleStream(s inet.Stream) {
return
}
case pb.Message_DISCOVER_SUBSCRIBE:
r := rz.handleDiscoverSubscribe(pid, req.GetDiscoverSubscribe())
res.Type = pb.Message_DISCOVER_SUBSCRIBE_RESPONSE
res.DiscoverSubscribeResponse = r
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
return
}
default:
log.Debugf("Unexpected message: %s", t.String())
return
@ -104,34 +91,30 @@ func (rz *RendezvousService) handleRegister(p peer.ID, m *pb.Message_Register) *
return newRegisterResponseError(pb.Message_E_INVALID_NAMESPACE, "namespace too long")
}
mpi := m.GetPeer()
if mpi == nil {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer info")
signedPeerRecord := m.GetSignedPeerRecord()
if signedPeerRecord == nil {
return newRegisterResponseError(pb.Message_E_INVALID_SIGNED_PEER_RECORD, "missing signed peer record")
}
mpid := mpi.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
peerRecord, err := pbToPeerRecord(signedPeerRecord)
if err != nil {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "bad peer id")
return newRegisterResponseError(pb.Message_E_INVALID_SIGNED_PEER_RECORD, "invalid peer record")
}
if mp != p {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer id mismatch")
}
if peerRecord.ID != p {
return newRegisterResponseError(pb.Message_E_INVALID_SIGNED_PEER_RECORD, "peer id mismatch")
}
maddrs := mpi.GetAddrs()
if len(maddrs) == 0 {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "missing peer addresses")
if len(peerRecord.Addrs) == 0 {
return newRegisterResponseError(pb.Message_E_INVALID_SIGNED_PEER_RECORD, "missing peer addresses")
}
mlen := 0
for _, maddr := range maddrs {
mlen += len(maddr)
for _, maddr := range peerRecord.Addrs {
mlen += len(maddr.Bytes())
}
if mlen > MaxPeerAddressLength {
return newRegisterResponseError(pb.Message_E_INVALID_PEER_INFO, "peer info too long")
return newRegisterResponseError(pb.Message_E_INVALID_SIGNED_PEER_RECORD, "peer info too long")
}
// Note:
@ -139,7 +122,7 @@ func (rz *RendezvousService) handleRegister(p peer.ID, m *pb.Message_Register) *
// Perhaps we should though.
mttl := m.GetTtl()
if mttl < 0 || mttl > MaxTTL {
if mttl > MaxTTL {
return newRegisterResponseError(pb.Message_E_INVALID_TTL, "bad ttl")
}
@ -163,7 +146,7 @@ func (rz *RendezvousService) handleRegister(p peer.ID, m *pb.Message_Register) *
}
// ok, seems like we can register
counter, err := rz.DB.Register(p, ns, maddrs, ttl)
_, err = rz.DB.Register(p, ns, signedPeerRecord, ttl)
if err != nil {
log.Errorf("Error registering: %s", err.Error())
return newRegisterResponseError(pb.Message_E_INTERNAL_ERROR, "database error")
@ -171,28 +154,12 @@ func (rz *RendezvousService) handleRegister(p peer.ID, m *pb.Message_Register) *
log.Infof("registered peer %s %s (%d)", p, ns, ttl)
for _, rzs := range rz.rzs {
rzs.Register(p, ns, maddrs, ttl, counter)
}
return newRegisterResponse(ttl)
}
func (rz *RendezvousService) handleUnregister(p peer.ID, m *pb.Message_Unregister) error {
ns := m.GetNs()
mpid := m.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
if err != nil {
return err
}
if mp != p {
return fmt.Errorf("peer id mismatch: %s asked to unregister %s", p.Pretty(), mp.Pretty())
}
}
err := rz.DB.Unregister(p, ns)
if err != nil {
return err
@ -200,10 +167,6 @@ func (rz *RendezvousService) handleUnregister(p peer.ID, m *pb.Message_Unregiste
log.Infof("unregistered peer %s %s", p, ns)
for _, rzs := range rz.rzs {
rzs.Unregister(p, ns)
}
return nil
}
@ -216,7 +179,7 @@ func (rz *RendezvousService) handleDiscover(p peer.ID, m *pb.Message_Discover) *
limit := MaxDiscoverLimit
mlimit := m.GetLimit()
if mlimit > 0 && mlimit < int64(limit) {
if mlimit > 0 && mlimit < uint64(limit) {
limit = int(mlimit)
}
@ -231,31 +194,7 @@ func (rz *RendezvousService) handleDiscover(p peer.ID, m *pb.Message_Discover) *
return newDiscoverResponseError(pb.Message_E_INTERNAL_ERROR, "database error")
}
log.Infof("discover query: %s %s -> %d", p, ns, len(regs))
log.Debugf("discover query: %s %s -> %d", p, ns, len(regs))
return newDiscoverResponse(regs, cookie)
}
func (rz *RendezvousService) handleDiscoverSubscribe(_ peer.ID, m *pb.Message_DiscoverSubscribe) *pb.Message_DiscoverSubscribeResponse {
ns := m.GetNs()
for _, s := range rz.rzs {
rzSub, ok := s.(RendezvousSyncSubscribable)
if !ok {
continue
}
for _, supportedSubType := range m.GetSupportedSubscriptionTypes() {
if rzSub.GetServiceType() == supportedSubType {
sub, err := rzSub.Subscribe(ns)
if err != nil {
return newDiscoverSubscribeResponseError(pb.Message_E_INTERNAL_ERROR, "error while subscribing")
}
return newDiscoverSubscribeResponse(supportedSubType, sub)
}
}
}
return newDiscoverSubscribeResponseError(pb.Message_E_INTERNAL_ERROR, "subscription type not found")
}

View File

@ -230,12 +230,12 @@ func WithExternalIP(ip net.IP) WakuNodeOption {
params.addressFactory = func(inputAddr []multiaddr.Multiaddr) (addresses []multiaddr.Multiaddr) {
addresses = append(addresses, inputAddr...)
component := "/ip4/"
if ip.To4() == nil && ip.To16() != nil {
component = "/ip6/"
ipType := "/ip4/"
if utils.IsIPv6(ip.String()) {
ipType = "/ip6/"
}
hostAddrMA, err := multiaddr.NewMultiaddr(component + ip.String())
hostAddrMA, err := multiaddr.NewMultiaddr(ipType + ip.String())
if err != nil {
panic("Could not build external IP")
}

View File

@ -57,7 +57,18 @@ func enodeToMultiAddr(node *enode.Node) (multiaddr.Multiaddr, error) {
return nil, err
}
return multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID))
ipType := "ip4"
portNumber := node.TCP()
if utils.IsIPv6(node.IP().String()) {
ipType = "ip6"
var port enr.TCP6
if err := node.Record().Load(&port); err != nil {
return nil, err
}
portNumber = int(port)
}
return multiaddr.NewMultiaddr(fmt.Sprintf("/%s/%s/tcp/%d/p2p/%s", ipType, node.IP(), portNumber, peerID))
}
// Multiaddress is used to extract all the multiaddresses that are part of a ENR record

View File

@ -1,6 +1,8 @@
package enr
import (
"errors"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/waku-org/go-waku/waku/v2/protocol"
@ -24,7 +26,7 @@ func WithWakuRelayShardingBitVector(rs protocol.RelayShards) ENROption {
}
}
func WithtWakuRelaySharding(rs protocol.RelayShards) ENROption {
func WithWakuRelaySharding(rs protocol.RelayShards) ENROption {
return func(localnode *enode.LocalNode) error {
if len(rs.Indices) >= 64 {
return WithWakuRelayShardingBitVector(rs)(localnode)
@ -34,6 +36,21 @@ func WithtWakuRelaySharding(rs protocol.RelayShards) ENROption {
}
}
func WithWakuRelayShardingTopics(topics ...string) ENROption {
return func(localnode *enode.LocalNode) error {
rs, err := protocol.TopicsToRelayShards(topics...)
if err != nil {
return err
}
if len(rs) != 1 {
return errors.New("expected a single RelayShards")
}
return WithWakuRelaySharding(rs[0])(localnode)
}
}
// ENR record accessors
func RelayShardingIndicesList(localnode *enode.LocalNode) (*protocol.RelayShards, error) {

View File

@ -154,14 +154,7 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string,
}
func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
metrics.RecordFilterError(ctx, "dial_failure")
return err
}
var conn network.Stream
conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
metrics.RecordFilterError(ctx, "dial_failure")
return err

View File

@ -285,19 +285,6 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
ctx, cancel := context.WithTimeout(ctx, MessagePushTimeout)
defer cancel()
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peerID))
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)
if errors.Is(context.DeadlineExceeded, err) {
metrics.RecordFilterError(ctx, "push_timeout_failure")
} else {
metrics.RecordFilterError(ctx, "dial_failure")
}
logger.Error("connecting to peer", zap.Error(err))
return err
}
conn, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1)
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)

View File

@ -171,15 +171,6 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, ms
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*wpb.WakuMessage{msg}}}
logger := wf.log.With(logging.HostID("peer", subscriber.peer))
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(subscriber.peer))
if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer)
logger.Error("connecting to peer", zap.Error(err))
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return err
}
conn, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1)
if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer)
@ -269,13 +260,6 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err = wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return
}
request := &pb.FilterRequest{
Subscribe: true,
Topic: filter.Topic,
@ -313,12 +297,6 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error {
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer))
if err != nil {
metrics.RecordLegacyFilterError(ctx, "dial_failure")
return err
}
conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
if err != nil {

View File

@ -157,13 +157,6 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
}
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
metrics.RecordLightpushError(ctx, "dial_failure")
logger.Error("connecting peer", zap.Error(err))
return nil, err
}
connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
if err != nil {

View File

@ -38,12 +38,6 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
},
}
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wakuPX.h.Connect(ctx, wakuPX.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
return err
}
connOpt, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
if err != nil {
return err

View File

@ -72,6 +72,42 @@ func (rs RelayShards) ContainsNamespacedTopic(topic NamespacedPubsubTopic) bool
return rs.Contains(shardedTopic.Cluster(), shardedTopic.Shard())
}
func TopicsToRelayShards(topic ...string) ([]RelayShards, error) {
result := make([]RelayShards, 0)
dict := make(map[uint16]map[uint16]struct{})
for _, t := range topic {
var ps StaticShardingPubsubTopic
err := ps.Parse(t)
if err != nil {
return nil, err
}
indices, ok := dict[ps.cluster]
if !ok {
indices = make(map[uint16]struct{})
}
indices[ps.shard] = struct{}{}
dict[ps.cluster] = indices
}
for cluster, indices := range dict {
idx := make([]uint16, 0, len(indices))
for index := range indices {
idx = append(idx, index)
}
rs, err := NewRelayShards(cluster, idx...)
if err != nil {
return nil, err
}
result = append(result, rs)
}
return result, nil
}
func (rs RelayShards) ContainsTopic(topic string) bool {
nsTopic, err := ToShardedPubsubTopic(topic)
if err != nil {

View File

@ -176,14 +176,6 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
logger := store.log.With(logging.HostID("peer", selectedPeer))
logger.Info("querying message history")
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer))
if err != nil {
logger.Error("connecting to peer", zap.Error(err))
metrics.RecordStoreError(store.ctx, "dial_failure")
return nil, err
}
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))

View File

@ -11,8 +11,8 @@ import (
"fmt"
"time"
dbi "github.com/berty/go-libp2p-rendezvous/db"
"github.com/libp2p/go-libp2p/core/peer"
dbi "github.com/waku-org/go-libp2p-rendezvous/db"
"go.uber.org/zap"
)
@ -164,9 +164,8 @@ func (db *DB) prepareStmts() error {
return nil
}
func (db *DB) Register(p peer.ID, ns string, addrs [][]byte, ttl int) (uint64, error) {
func (db *DB) Register(p peer.ID, ns string, signedPeerRecord []byte, ttl int) (uint64, error) {
pid := p.Pretty()
maddrs := packAddrs(addrs)
expire := time.Now().Unix() + int64(ttl)
tx, err := db.db.Begin()
@ -184,7 +183,7 @@ func (db *DB) Register(p peer.ID, ns string, addrs [][]byte, ttl int) (uint64, e
return 0, err
}
_, err = insertNew.Exec(pid, ns, expire, maddrs)
_, err = insertNew.Exec(pid, ns, expire, signedPeerRecord)
if err != nil {
_ = tx.Rollback()
return 0, err
@ -272,12 +271,11 @@ func (db *DB) Discover(ns string, cookie []byte, limit int) ([]dbi.RegistrationR
rid string
rns string
expire int64
raddrs []byte
addrs [][]byte
signedPeerRecord []byte
p peer.ID
)
err = rows.Scan(&counter, &rid, &rns, &expire, &raddrs)
err = rows.Scan(&counter, &rid, &rns, &expire, &signedPeerRecord)
if err != nil {
db.logger.Error("row scan error", zap.Error(err))
return nil, nil, err
@ -289,14 +287,8 @@ func (db *DB) Discover(ns string, cookie []byte, limit int) ([]dbi.RegistrationR
continue
}
addrs, err := unpackAddrs(raddrs)
if err != nil {
db.logger.Error("error unpacking address", zap.Error(err))
continue
}
reg.Id = p
reg.Addrs = addrs
reg.SignedPeerRecord = signedPeerRecord
reg.Ttl = int(expire - now)
if ns == "" {
@ -342,47 +334,6 @@ func (db *DB) cleanupExpired() {
}
}
func packAddrs(addrs [][]byte) []byte {
packlen := 0
for _, addr := range addrs {
packlen = packlen + 2 + len(addr)
}
packed := make([]byte, packlen)
buf := packed
for _, addr := range addrs {
binary.BigEndian.PutUint16(buf, uint16(len(addr)))
buf = buf[2:]
copy(buf, addr)
buf = buf[len(addr):]
}
return packed
}
func unpackAddrs(packed []byte) ([][]byte, error) {
var addrs [][]byte
buf := packed
for len(buf) > 1 {
l := binary.BigEndian.Uint16(buf)
buf = buf[2:]
if len(buf) < int(l) {
return nil, fmt.Errorf("bad packed address: not enough bytes %v %v", packed, buf)
}
addr := make([]byte, l)
copy(addr, buf[:l])
buf = buf[l:]
addrs = append(addrs, addr)
}
if len(buf) > 0 {
return nil, fmt.Errorf("bad packed address: unprocessed bytes: %v %v", packed, buf)
}
return addrs, nil
}
// cookie: counter:SHA256(nonce + ns + counter)
func packCookie(counter int64, ns string, nonce []byte) []byte {
cbits := make([]byte, 8)

View File

@ -7,9 +7,9 @@ import (
"sync"
"time"
rvs "github.com/berty/go-libp2p-rendezvous"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
rvs "github.com/waku-org/go-libp2p-rendezvous"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -116,6 +116,8 @@ func (r *Rendezvous) discover(ctx context.Context) {
if err != nil {
r.log.Error("could not discover new peers", zap.Error(err))
cookie = nil
// TODO: add backoff strategy
// continue
}
if len(addrInfo) != 0 {
@ -133,7 +135,7 @@ func (r *Rendezvous) discover(ctx context.Context) {
} else {
// TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query
// TODO: improve this by adding an exponential backoff?
time.Sleep(2 * time.Second)
time.Sleep(5 * time.Second)
}
}
}

16
vendor/github.com/waku-org/go-waku/waku/v2/utils/ip.go generated vendored Normal file
View File

@ -0,0 +1,16 @@
package utils
import (
"net"
"strings"
)
func IsIPv4(str string) bool {
ip := net.ParseIP(str)
return ip != nil && !strings.Contains(str, ":")
}
func IsIPv6(str string) bool {
ip := net.ParseIP(str)
return ip != nil && strings.Contains(str, ":")
}

17
vendor/modules.txt vendored
View File

@ -128,11 +128,6 @@ github.com/benbjohnson/immutable
# github.com/beorn7/perks v1.0.1
## explicit; go 1.11
github.com/beorn7/perks/quantile
# github.com/berty/go-libp2p-rendezvous v0.4.1
## explicit; go 1.18
github.com/berty/go-libp2p-rendezvous
github.com/berty/go-libp2p-rendezvous/db
github.com/berty/go-libp2p-rendezvous/pb
# github.com/bits-and-blooms/bitset v1.2.0
## explicit; go 1.14
github.com/bits-and-blooms/bitset
@ -317,7 +312,6 @@ github.com/godbus/dbus/v5
# github.com/gogo/protobuf v1.3.2
## explicit; go 1.15
github.com/gogo/protobuf/gogoproto
github.com/gogo/protobuf/io
github.com/gogo/protobuf/proto
github.com/gogo/protobuf/protoc-gen-gogo/descriptor
# github.com/golang-jwt/jwt/v4 v4.3.0
@ -856,8 +850,8 @@ github.com/remyoudompheng/bigfft
# github.com/rivo/uniseg v0.2.0
## explicit; go 1.12
github.com/rivo/uniseg
# github.com/rjeczalik/notify v0.9.2
## explicit
# github.com/rjeczalik/notify v0.9.3
## explicit; go 1.11
github.com/rjeczalik/notify
# github.com/rs/cors v1.7.0
## explicit
@ -987,7 +981,12 @@ github.com/vacp2p/mvds/transport
github.com/waku-org/go-discover/discover
github.com/waku-org/go-discover/discover/v4wire
github.com/waku-org/go-discover/discover/v5wire
# github.com/waku-org/go-waku v0.6.1-0.20230526151800-10c2e20910bf
# github.com/waku-org/go-libp2p-rendezvous v0.0.0-20230601172541-0fad5ff68671
## explicit; go 1.19
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.6.1-0.20230605200314-b0c094b0b663
## explicit; go 1.19
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence