From bbab0f171484d8ccdad2117ed0ad2e39fde52e61 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 25 Apr 2022 23:31:26 +0400 Subject: [PATCH] fix: add topic to broadcast and some comments to functions (#229) --- examples/chat2/main.go | 2 +- examples/filter2/main.go | 2 +- library/api.go | 2 +- mobile/api.go | 3 +- mobile/api_relay.go | 6 +- tests/utils.go | 5 ++ waku/node.go | 16 ++-- waku/persistence/rendezvous.go | 6 ++ waku/v2/broadcast.go | 95 +++++++++++++++------ waku/v2/broadcast_test.go | 14 ++- waku/v2/discv5/discover.go | 2 +- waku/v2/dnsdisc/enr.go | 15 ++-- waku/v2/node/connectedness.go | 21 +++-- waku/v2/node/waku_payload.go | 5 +- waku/v2/node/wakunode2.go | 39 +++++++-- waku/v2/node/wakuoptions.go | 6 ++ waku/v2/protocol/filter/waku_filter_test.go | 4 +- waku/v2/protocol/relay/waku_relay.go | 8 +- waku/v2/protocol/store/waku_store.go | 15 +++- waku/v2/protocol/swap/waku_swap.go | 4 +- waku/v2/rpc/filter_test.go | 2 +- waku/v2/rpc/runner.go | 4 +- waku/v2/utils/enr.go | 24 ++++-- waku/v2/utils/enr_test.go | 4 +- waku/v2/utils/logger.go | 2 + waku/v2/utils/peer.go | 8 +- waku/v2/utils/time.go | 2 +- 27 files changed, 213 insertions(+), 103 deletions(-) diff --git a/examples/chat2/main.go b/examples/chat2/main.go index 15128142..acea6373 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -295,5 +295,5 @@ func addPeer(wakuNode *node.WakuNode, addr string, protocol protocol.ID) (*peer. return nil, err } - return wakuNode.AddPeer(ma, protocol) + return wakuNode.AddPeer(ma, string(protocol)) } diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 941e4a57..dd4df3d2 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -82,7 +82,7 @@ func main() { panic(err) } - _, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], filter.FilterID_v20beta1) + _, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], string(filter.FilterID_v20beta1)) if err != nil { log.Info("Error adding filter peer on light node ", err) } diff --git a/library/api.go b/library/api.go index 33a2db1a..4c6f1ed7 100644 --- a/library/api.go +++ b/library/api.go @@ -117,7 +117,7 @@ func waku_default_pubsub_topic() *C.char { //export waku_set_event_callback // Register callback to act as signal handler and receive application signal -// (in JSON) which are used o react to asyncronous events in waku. The function +// (in JSON) which are used o react to asynchronous events in waku. The function // signature for the callback should be `void myCallback(char* signalJSON)` func waku_set_event_callback(cb unsafe.Pointer) { mobile.SetEventCallback(cb) diff --git a/mobile/api.go b/mobile/api.go index 0237ffbb..6608029b 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -16,7 +16,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/libp2p/go-libp2p-core/peer" - p2pproto "github.com/libp2p/go-libp2p-core/protocol" "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol" @@ -195,7 +194,7 @@ func AddPeer(address string, protocolID string) string { return makeJSONResponse(err) } - peerID, err := wakuNode.AddPeer(ma, p2pproto.ID(protocolID)) + peerID, err := wakuNode.AddPeer(ma, protocolID) return prepareJSONResponse(peerID, err) } diff --git a/mobile/api_relay.go b/mobile/api_relay.go index 4f7b3802..83892233 100644 --- a/mobile/api_relay.go +++ b/mobile/api_relay.go @@ -89,7 +89,7 @@ func RelaySubscribe(topic string) string { mutex.Lock() defer mutex.Unlock() - subscription, ok := subscriptions[topicToSubscribe] + _, ok := subscriptions[topicToSubscribe] if ok { return makeJSONResponse(nil) } @@ -101,11 +101,11 @@ func RelaySubscribe(topic string) string { subscriptions[topicToSubscribe] = subscription - go func() { + go func(subscription *relay.Subscription) { for envelope := range subscription.C { send("message", toSubscriptionMessage(envelope)) } - }() + }(subscription) return makeJSONResponse(nil) } diff --git a/tests/utils.go b/tests/utils.go index bc1a587f..94ac6e0f 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -28,10 +28,12 @@ func Logger() *zap.SugaredLogger { return log } +// GetHostAddress returns the first listen address used by a host func GetHostAddress(ha host.Host) ma.Multiaddr { return ha.Addrs()[0] } +// FindFreePort returns an available port number func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) { t.Helper() @@ -61,6 +63,7 @@ func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) { return 0, fmt.Errorf("no free port found") } +// MakeHost creates a Libp2p host with a random key on a specific port func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) { // Creates a new RSA key pair for this host. prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness) @@ -80,10 +83,12 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e ) } +// CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage { return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp} } +// RandomHex returns a random hex string of n bytes func RandomHex(n int) (string, error) { bytes := make([]byte, n) if _, err := rand.Read(bytes); err != nil { diff --git a/waku/node.go b/waku/node.go index af549d09..d388be56 100644 --- a/waku/node.go +++ b/waku/node.go @@ -25,7 +25,6 @@ import ( "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p/config" - "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-peerstore/pstoreds" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" @@ -216,10 +215,10 @@ func Execute(options Options) { failOnErr(err, "Wakunode") - addPeers(wakuNode, options.Rendezvous.Nodes.Value(), rendezvous.RendezvousID_v001) - addPeers(wakuNode, options.Store.Nodes.Value(), store.StoreID_v20beta4) - addPeers(wakuNode, options.LightPush.Nodes.Value(), lightpush.LightPushID_v20beta1) - addPeers(wakuNode, options.Filter.Nodes.Value(), filter.FilterID_v20beta1) + addPeers(wakuNode, options.Rendezvous.Nodes.Value(), string(rendezvous.RendezvousID_v001)) + addPeers(wakuNode, options.Store.Nodes.Value(), string(store.StoreID_v20beta4)) + addPeers(wakuNode, options.LightPush.Nodes.Value(), string(lightpush.LightPushID_v20beta1)) + addPeers(wakuNode, options.Filter.Nodes.Value(), string(filter.FilterID_v20beta1)) if err = wakuNode.Start(); err != nil { utils.Logger().Fatal(fmt.Errorf("could not start waku node, %w", err).Error()) @@ -237,9 +236,10 @@ func Execute(options Options) { if options.Relay.Enable { for _, nodeTopic := range options.Relay.Topics.Value() { + nodeTopic := nodeTopic sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) failOnErr(err, "Error subscring to topic") - wakuNode.Broadcaster().Unregister(sub.C) + wakuNode.Broadcaster().Unregister(&nodeTopic, sub.C) } } @@ -307,7 +307,7 @@ func Execute(options Options) { } } -func addPeers(wakuNode *node.WakuNode, addresses []string, protocol protocol.ID) { +func addPeers(wakuNode *node.WakuNode, addresses []string, protocols ...string) { for _, addrString := range addresses { if addrString == "" { continue @@ -316,7 +316,7 @@ func addPeers(wakuNode *node.WakuNode, addresses []string, protocol protocol.ID) addr, err := multiaddr.NewMultiaddr(addrString) failOnErr(err, "invalid multiaddress") - _, err = wakuNode.AddPeer(addr, protocol) + _, err = wakuNode.AddPeer(addr, protocols...) failOnErr(err, "error adding peer") } } diff --git a/waku/persistence/rendezvous.go b/waku/persistence/rendezvous.go index 4586af31..18c35057 100644 --- a/waku/persistence/rendezvous.go +++ b/waku/persistence/rendezvous.go @@ -7,10 +7,12 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) +// RendezVousLevelDB is a struct used to hold a reference to a LevelDB database type RendezVousLevelDB struct { db *leveldb.DB } +// NewRendezVousLevelDB opens a LevelDB database to be used for rendezvous protocol func NewRendezVousLevelDB(dBPath string) (*RendezVousLevelDB, error) { db, err := leveldb.OpenFile(dBPath, &opt.Options{OpenFilesCacheCapacity: 3}) @@ -21,14 +23,18 @@ func NewRendezVousLevelDB(dBPath string) (*RendezVousLevelDB, error) { return &RendezVousLevelDB{db}, nil } +// Delete removes a key from the database func (r *RendezVousLevelDB) Delete(key []byte) error { return r.db.Delete(key, nil) } +// Put inserts or updates a key in the database func (r *RendezVousLevelDB) Put(key []byte, value []byte) error { return r.db.Put(key, value, nil) } +// NewIterator returns an interator that can be used to iterate over all +// the records contained in the DB func (r *RendezVousLevelDB) NewIterator(prefix []byte) rendezvous.Iterator { return r.db.NewIterator(util.BytesPrefix(prefix), nil) } diff --git a/waku/v2/broadcast.go b/waku/v2/broadcast.go index f734aafa..768a193e 100644 --- a/waku/v2/broadcast.go +++ b/waku/v2/broadcast.go @@ -10,29 +10,33 @@ import ( type doneCh chan struct{} type chOperation struct { - ch chan<- *protocol.Envelope - done doneCh + ch chan<- *protocol.Envelope + topic *string + done doneCh } +type broadcastOutputs map[chan<- *protocol.Envelope]struct{} + type broadcaster struct { input chan *protocol.Envelope reg chan chOperation unreg chan chOperation - outputs map[chan<- *protocol.Envelope]bool + outputs broadcastOutputs + outputsPerTopic map[string]broadcastOutputs } // The Broadcaster interface describes the main entry points to // broadcasters. type Broadcaster interface { - // Register a new channel to receive broadcasts - Register(chan<- *protocol.Envelope) - // Register a new channel to receive broadcasts and return a channel to wait until this operation is complete - WaitRegister(newch chan<- *protocol.Envelope) doneCh - // Unregister a channel so that it no longer receives broadcasts. - Unregister(chan<- *protocol.Envelope) + // Register a new channel to receive broadcasts from a pubsubtopic + Register(topic *string, newch chan<- *protocol.Envelope) + // Register a new channel to receive broadcasts from a pubsub topic and return a channel to wait until this operation is complete + WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh + // Unregister a channel so that it no longer receives broadcasts from a pubsub topic + Unregister(topic *string, newch chan<- *protocol.Envelope) // Unregister a subscriptor channel and return a channel to wait until this operation is done - WaitUnregister(newch chan<- *protocol.Envelope) doneCh + WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh // Shut this broadcaster down. Close() // Submit a new object to all subscribers @@ -43,6 +47,15 @@ func (b *broadcaster) broadcast(m *protocol.Envelope) { for ch := range b.outputs { ch <- m } + + outputs, ok := b.outputsPerTopic[m.PubsubTopic()] + if !ok { + return + } + + for ch := range outputs { + ch <- m + } } func (b *broadcaster) run() { @@ -52,7 +65,18 @@ func (b *broadcaster) run() { b.broadcast(m) case broadcastee, ok := <-b.reg: if ok { - b.outputs[broadcastee.ch] = true + if broadcastee.topic != nil { + topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic] + if !ok { + b.outputsPerTopic[*broadcastee.topic] = make(broadcastOutputs) + topicOutputs = b.outputsPerTopic[*broadcastee.topic] + } + + topicOutputs[broadcastee.ch] = struct{}{} + b.outputsPerTopic[*broadcastee.topic] = topicOutputs + } else { + b.outputs[broadcastee.ch] = struct{}{} + } if broadcastee.done != nil { broadcastee.done <- struct{}{} } @@ -63,7 +87,17 @@ func (b *broadcaster) run() { return } case broadcastee := <-b.unreg: - delete(b.outputs, broadcastee.ch) + if broadcastee.topic != nil { + topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic] + if !ok { + continue + } + delete(topicOutputs, broadcastee.ch) + b.outputsPerTopic[*broadcastee.topic] = topicOutputs + } else { + delete(b.outputs, broadcastee.ch) + } + if broadcastee.done != nil { broadcastee.done <- struct{}{} } @@ -76,10 +110,11 @@ func (b *broadcaster) run() { // an Envelope containing a WakuMessage func NewBroadcaster(buflen int) Broadcaster { b := &broadcaster{ - input: make(chan *protocol.Envelope, buflen), - reg: make(chan chOperation), - unreg: make(chan chOperation), - outputs: make(map[chan<- *protocol.Envelope]bool), + input: make(chan *protocol.Envelope, buflen), + reg: make(chan chOperation), + unreg: make(chan chOperation), + outputs: make(broadcastOutputs), + outputsPerTopic: make(map[string]broadcastOutputs), } go b.run() @@ -88,38 +123,42 @@ func NewBroadcaster(buflen int) Broadcaster { } // Register a subscriptor channel and return a channel to wait until this operation is done -func (b *broadcaster) WaitRegister(newch chan<- *protocol.Envelope) doneCh { +func (b *broadcaster) WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh { d := make(doneCh) b.reg <- chOperation{ - ch: newch, - done: d, + ch: newch, + topic: topic, + done: d, } return d } // Register a subscriptor channel -func (b *broadcaster) Register(newch chan<- *protocol.Envelope) { +func (b *broadcaster) Register(topic *string, newch chan<- *protocol.Envelope) { b.reg <- chOperation{ - ch: newch, - done: nil, + ch: newch, + topic: topic, + done: nil, } } // Unregister a subscriptor channel and return a channel to wait until this operation is done -func (b *broadcaster) WaitUnregister(newch chan<- *protocol.Envelope) doneCh { +func (b *broadcaster) WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh { d := make(doneCh) b.unreg <- chOperation{ - ch: newch, - done: d, + ch: newch, + topic: topic, + done: d, } return d } // Unregister a subscriptor channel -func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) { +func (b *broadcaster) Unregister(topic *string, newch chan<- *protocol.Envelope) { b.unreg <- chOperation{ - ch: newch, - done: nil, + ch: newch, + topic: topic, + done: nil, } } diff --git a/waku/v2/broadcast_test.go b/waku/v2/broadcast_test.go index 05ef69aa..66475c85 100644 --- a/waku/v2/broadcast_test.go +++ b/waku/v2/broadcast_test.go @@ -9,7 +9,6 @@ import ( // Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 // by Dustin Sallings (c) 2013, which was released under MIT license - func TestBroadcast(t *testing.T) { wg := sync.WaitGroup{} @@ -20,12 +19,10 @@ func TestBroadcast(t *testing.T) { wg.Add(1) cch := make(chan *protocol.Envelope) - - b.Register(cch) - + b.Register(nil, cch) go func() { defer wg.Done() - defer b.Unregister(cch) + defer b.Unregister(nil, cch) <-cch }() @@ -47,13 +44,13 @@ func TestBroadcastWait(t *testing.T) { wg.Add(1) cch := make(chan *protocol.Envelope) - <-b.WaitRegister(cch) + <-b.WaitRegister(nil, cch) go func() { defer wg.Done() <-cch - <-b.WaitUnregister(cch) + <-b.WaitUnregister(nil, cch) }() } @@ -66,6 +63,7 @@ func TestBroadcastWait(t *testing.T) { func TestBroadcastCleanup(t *testing.T) { b := NewBroadcaster(100) - b.Register(make(chan *protocol.Envelope)) + topic := "test" + b.Register(&topic, make(chan *protocol.Envelope)) b.Close() } diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index d7e42c95..27ebe192 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -46,7 +46,7 @@ type DiscoveryV5 struct { // Used for those weird cases where updateAddress // receives the same external address twice both with the original port - // and the nat port. Ideally this atribute should be removed by doing + // and the nat port. Ideally this attribute should be removed by doing // hole punching before starting waku ogTCPPort int } diff --git a/waku/v2/dnsdisc/enr.go b/waku/v2/dnsdisc/enr.go index 0d9212a9..be35190f 100644 --- a/waku/v2/dnsdisc/enr.go +++ b/waku/v2/dnsdisc/enr.go @@ -9,25 +9,24 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -type DnsDiscoveryParameters struct { +type dnsDiscoveryParameters struct { nameserver string } -type DnsDiscoveryOption func(*DnsDiscoveryParameters) +type DnsDiscoveryOption func(*dnsDiscoveryParameters) // WithMultiaddress is a WakuNodeOption that configures libp2p to listen on a list of multiaddresses func WithNameserver(nameserver string) DnsDiscoveryOption { - return func(params *DnsDiscoveryParameters) { + return func(params *dnsDiscoveryParameters) { params.nameserver = nameserver } } -// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable -// ENR tree +// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable ENR tree func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) ([]ma.Multiaddr, error) { var multiAddrs []ma.Multiaddr - params := new(DnsDiscoveryParameters) + params := new(dnsDiscoveryParameters) for _, opt := range opts { opt(params) } @@ -42,12 +41,12 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) } for _, node := range tree.Nodes() { - m, err := utils.EnodeToMultiAddr(node) + m, err := utils.Multiaddress(node) if err != nil { return nil, err } - multiAddrs = append(multiAddrs, m) + multiAddrs = append(multiAddrs, m...) } return multiAddrs, nil diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index da2a9d1a..2c4b36bc 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -17,15 +17,19 @@ import ( "go.uber.org/zap" ) -// A map of peer IDs to supported protocols +// PeerStatis is a map of peer IDs to supported protocols type PeerStats map[peer.ID][]string +// ConnStatus is used to indicate if the node is online, has access to history +// and also see the list of peers the node is aware of type ConnStatus struct { IsOnline bool HasHistory bool Peers PeerStats } +// ConnectionNotifier is a custom Notifier to be used to display when a peer +// connects or disconnects to the node type ConnectionNotifier struct { h host.Host ctx context.Context @@ -44,35 +48,36 @@ func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.SugaredLog } } +// Listen is called when network starts listening on an addr func (c ConnectionNotifier) Listen(n network.Network, m ma.Multiaddr) { - // called when network starts listening on an addr } +// ListenClose is called when network stops listening on an address func (c ConnectionNotifier) ListenClose(n network.Network, m ma.Multiaddr) { - // called when network stops listening on an addr } +// Connected is called when a connection is opened func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { - // called when a connection opened c.log.Info(fmt.Sprintf("Peer %s connected", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(1)) } +// Disconnected is called when a connection closed func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) { - // called when a connection closed c.log.Info(fmt.Sprintf("Peer %s disconnected", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(-1)) c.DisconnectChan <- cc.RemotePeer() } +// OpenedStream is called when a stream opened func (c ConnectionNotifier) OpenedStream(n network.Network, s network.Stream) { - // called when a stream opened } +// ClosedStream is called when a stream closed func (c ConnectionNotifier) ClosedStream(n network.Network, s network.Stream) { - // called when a stream closed } +// Close quits the ConnectionNotifier func (c ConnectionNotifier) Close() { close(c.quit) } @@ -101,6 +106,8 @@ func (w *WakuNode) connectednessListener() { } } +// Status returns the current status of the node (online or not) +// and if the node has access to history nodes or not func (w *WakuNode) Status() (isOnline bool, hasHistory bool) { hasRelay := false hasLightPush := false diff --git a/waku/v2/node/waku_payload.go b/waku/v2/node/waku_payload.go index 58e92fd6..5708d6e5 100644 --- a/waku/v2/node/waku_payload.go +++ b/waku/v2/node/waku_payload.go @@ -17,6 +17,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" ) +// KeyKind indicates the type of encryption to apply type KeyKind string const ( @@ -25,14 +26,14 @@ const ( None KeyKind = "None" ) -// The message to encode +// Payload contains the data of the message to encode type Payload struct { Data []byte // Raw message payload Padding []byte // Used to align data size, since data size alone might reveal important metainformation. Key *KeyInfo // Contains the type of encryption to apply and the private key to use for signing the message } -// The decoded payload of a received message. +// DecodedPayload contains the data of the received message after decrypting it type DecodedPayload struct { Data []byte // Decoded message payload Padding []byte // Used to align data size, since data size alone might reveal important metainformation. diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 02f02d59..797f4aac 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - p2pproto "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" ws "github.com/libp2p/go-ws-transport" ma "github.com/multiformats/go-multiaddr" @@ -90,6 +89,7 @@ func defaultStoreFactory(w *WakuNode) store.Store { return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) } +// New is used to instantiate a WakuNode using a set of WakuNodeOptions func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { params := new(WakuNodeParameters) @@ -279,6 +279,7 @@ func (w *WakuNode) checkForAddressChanges() { } } +// Start initializes all the protocols that were setup in the WakuNode func (w *WakuNode) Start() error { w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{ swap.WithMode(w.opts.swapMode), @@ -336,17 +337,18 @@ func (w *WakuNode) Start() error { // Subscribe store to topic if w.opts.storeMsgs { w.log.Info("Subscribing store to broadcaster") - w.bcaster.Register(w.store.MessageChannel()) + w.bcaster.Register(nil, w.store.MessageChannel()) } if w.filter != nil { w.log.Info("Subscribing filter to broadcaster") - w.bcaster.Register(w.filter.MsgC) + w.bcaster.Register(nil, w.filter.MsgC) } return nil } +// Stop stops the WakuNode and closess all connections to the host func (w *WakuNode) Stop() { defer w.cancel() @@ -377,14 +379,17 @@ func (w *WakuNode) Stop() { w.wg.Wait() } +// Host returns the libp2p Host used by the WakuNode func (w *WakuNode) Host() host.Host { return w.host } +// ID returns the base58 encoded ID from the host func (w *WakuNode) ID() string { return w.host.ID().Pretty() } +// ListenAddresses returns all the multiaddresses used by the host func (w *WakuNode) ListenAddresses() []ma.Multiaddr { hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty())) var result []ma.Multiaddr @@ -394,30 +399,39 @@ func (w *WakuNode) ListenAddresses() []ma.Multiaddr { return result } +// Relay is used to access any operation related to Waku Relay protocol func (w *WakuNode) Relay() *relay.WakuRelay { return w.relay } +// Store is used to access any operation related to Waku Store protocol func (w *WakuNode) Store() store.Store { return w.store } +// Filter is used to access any operation related to Waku Filter protocol func (w *WakuNode) Filter() *filter.WakuFilter { return w.filter } +// Lightpush is used to access any operation related to Waku Lightpush protocol func (w *WakuNode) Lightpush() *lightpush.WakuLightPush { return w.lightPush } +// DiscV5 is used to access any operation related to DiscoveryV5 func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 { return w.discoveryV5 } +// Broadcaster is used to access the message broadcaster that is used to push +// messages to different protocols func (w *WakuNode) Broadcaster() v2.Broadcaster { return w.bcaster } +// Publish will attempt to publish a message via WakuRelay if there are enough +// peers available, otherwise it will attempt to publish via Lightpush protocol func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { if !w.opts.enableLightPush && !w.opts.enableRelay { return errors.New("cannot publish message, relay and lightpush are disabled") @@ -456,7 +470,7 @@ func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) if err != nil { return err } - w.Broadcaster().Unregister(sub.C) + w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C) } // TODO: rlnRelay @@ -532,10 +546,10 @@ func (w *WakuNode) startStore() { } } -func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error { +func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error { w.log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty())) w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) - err := w.host.Peerstore().AddProtocols(info.ID, string(protocolID)) + err := w.host.Peerstore().AddProtocols(info.ID, protocols...) if err != nil { return err } @@ -543,15 +557,17 @@ func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error { return nil } -func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.ID, error) { +// AddPeer is used to add a peer and the protocols it support to the node peerstore +func (w *WakuNode) AddPeer(address ma.Multiaddr, protocols ...string) (*peer.ID, error) { info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { return nil, err } - return &info.ID, w.addPeer(info, protocolID) + return &info.ID, w.addPeer(info, protocols...) } +// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error { info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { @@ -561,6 +577,7 @@ func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Mult return w.connect(ctx, *info) } +// DialPeer is used to connect to a peer using a string containing a multiaddress func (w *WakuNode) DialPeer(ctx context.Context, address string) error { p, err := ma.NewMultiaddr(address) if err != nil { @@ -585,11 +602,13 @@ func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { return nil } +// DialPeerByID is used to connect to an already known peer func (w *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID) error { info := w.host.Peerstore().PeerInfo(peerID) return w.connect(ctx, info) } +// ClosePeerByAddress is used to disconnect from a peer using its multiaddress func (w *WakuNode) ClosePeerByAddress(address string) error { p, err := ma.NewMultiaddr(address) if err != nil { @@ -605,6 +624,7 @@ func (w *WakuNode) ClosePeerByAddress(address string) error { return w.ClosePeerById(info.ID) } +// ClosePeerById is used to close a connection to a peer func (w *WakuNode) ClosePeerById(id peer.ID) error { err := w.host.Network().ClosePeer(id) if err != nil { @@ -613,10 +633,12 @@ func (w *WakuNode) ClosePeerById(id peer.ID) error { return nil } +// PeerCount return the number of connected peers func (w *WakuNode) PeerCount() int { return len(w.host.Network().Peers()) } +// PeerStats returns a list of peers and the protocols supported by them func (w *WakuNode) PeerStats() PeerStats { p := make(PeerStats) for _, peerID := range w.host.Network().Peers() { @@ -629,6 +651,7 @@ func (w *WakuNode) PeerStats() PeerStats { return p } +// Peers return the list of peers, addresses, protocols supported and connection status func (w *WakuNode) Peers() ([]*Peer, error) { var peers []*Peer for _, peerId := range w.host.Peerstore().Peers() { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index b5ad8329..b16a1963 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -103,6 +103,7 @@ func (w WakuNodeParameters) Identity() config.Option { return libp2p.Identity(*w.GetPrivKey()) } +// AddressFactory returns the address factory used by the node's host func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory { return w.addressFactory } @@ -173,6 +174,7 @@ func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption { } } +// GetPrivKey returns the private key used in the node func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey { privKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(w.privKey)) return &privKey @@ -259,6 +261,8 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { } } +// WithWakuStoreFactory is used to replace the default WakuStore with a custom +// implementation that implements the store.Store interface func WithWakuStoreFactory(factory storeFactory) WakuNodeOption { return func(params *WakuNodeParameters) error { params.storeFactory = factory @@ -326,6 +330,7 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption { } } +// WithWebsockets is a WakuNodeOption used to enable websockets support func WithWebsockets(address string, port int) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableWS = true @@ -341,6 +346,7 @@ func WithWebsockets(address string, port int) WakuNodeOption { } } +// WithSecureWebsockets is a WakuNodeOption used to enable secure websockets support func WithSecureWebsockets(address string, port int, certPath string, keyPath string) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableWSS = true diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index ab7e101a..3548ce8b 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -69,7 +69,7 @@ func TestWakuFilter(t *testing.T) { defer sub2.Unsubscribe() node2Filter, _ := NewWakuFilter(ctx, host2, true, tests.Logger()) - broadcaster.Register(node2Filter.MsgC) + broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1)) @@ -154,7 +154,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { defer sub2.Unsubscribe() node2Filter, _ := NewWakuFilter(ctx, host2, true, tests.Logger(), WithTimeout(3*time.Second)) - broadcaster.Register(node2Filter.MsgC) + broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1)) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 377c112d..6d9909cc 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -228,7 +228,7 @@ func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscr w.subscriptions[topic] = append(w.subscriptions[topic], subscription) if w.bcaster != nil { - w.bcaster.Register(subscription.C) + w.bcaster.Register(&topic, subscription.C) } go w.subscribeToTopic(topic, subscription, sub) @@ -300,7 +300,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * for { select { case <-subscription.quit: - func() { + func(topic string) { subscription.Lock() defer subscription.Unlock() @@ -309,11 +309,11 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * } subscription.closed = true if w.bcaster != nil { - <-w.bcaster.WaitUnregister(subscription.C) // Remove from broadcast list + <-w.bcaster.WaitUnregister(&topic, subscription.C) // Remove from broadcast list } close(subscription.C) - }() + }(t) // TODO: if there are no more relay subscriptions, close the pubsub subscription case msg := <-subChannel: if msg == nil { diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 9215fed5..468113e1 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -34,10 +34,19 @@ const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4") const MaxPageSize = 100 var ( - ErrNoPeersAvailable = errors.New("no suitable remote peers") - ErrInvalidId = errors.New("invalid request id") + // ErrNoPeersAvailable is returned when there are no store peers in the peer store + // that could be used to retrieve message history + ErrNoPeersAvailable = errors.New("no suitable remote peers") + + // ErrInvalidId is returned when no RequestID is given + ErrInvalidId = errors.New("invalid request id") + + // ErrFailedToResumeHistory is returned when the node attempted to retrieve historic + // messages to fill its own message history but for some reason it failed ErrFailedToResumeHistory = errors.New("failed to resume the history") - ErrFailedQuery = errors.New("failed to resolve the query") + + // ErrFailedQuery is emitted when the query fails to return results + ErrFailedQuery = errors.New("failed to resolve the query") ) func minOf(vars ...int) int { diff --git a/waku/v2/protocol/swap/waku_swap.go b/waku/v2/protocol/swap/waku_swap.go index 75c205d9..07f55e2b 100644 --- a/waku/v2/protocol/swap/waku_swap.go +++ b/waku/v2/protocol/swap/waku_swap.go @@ -46,11 +46,11 @@ func (s *WakuSwap) sendCheque(peerId string) { func (s *WakuSwap) applyPolicy(peerId string) { if s.Accounting[peerId] <= s.params.disconnectThreshold { - s.log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId]) + s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId]) } if s.Accounting[peerId] >= s.params.paymentThreshold { - s.log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId]) + s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId]) if s.params.mode != HardMode { s.sendCheque(peerId) } diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 28499aa2..2f4ff9ce 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -58,7 +58,7 @@ func TestFilterSubscription(t *testing.T) { break } - _, err = d.node.AddPeer(addr, filter.FilterID_v20beta1) + _, err = d.node.AddPeer(addr, string(filter.FilterID_v20beta1)) require.NoError(t, err) args := &FilterContentArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}} diff --git a/waku/v2/rpc/runner.go b/waku/v2/rpc/runner.go index 82f7ebff..2c0b9930 100644 --- a/waku/v2/rpc/runner.go +++ b/waku/v2/rpc/runner.go @@ -24,7 +24,7 @@ func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService { func (r *runnerService) Start() { r.ch = make(chan *protocol.Envelope, 1024) - r.broadcaster.Register(r.ch) + r.broadcaster.Register(nil, r.ch) for { select { @@ -38,6 +38,6 @@ func (r *runnerService) Start() { func (r *runnerService) Stop() { r.quit <- true - r.broadcaster.Unregister(r.ch) + r.broadcaster.Unregister(nil, r.ch) close(r.ch) } diff --git a/waku/v2/utils/enr.go b/waku/v2/utils/enr.go index c8fa4a6a..733d33cb 100644 --- a/waku/v2/utils/enr.go +++ b/waku/v2/utils/enr.go @@ -17,12 +17,17 @@ import ( "go.uber.org/zap" ) +// WakuENRField is the name of the ENR field that contains information about which protocols are supported by the node const WakuENRField = "waku2" + +// MultiaddrENRField is the name of the ENR field that will contain multiaddresses that cannot be described using the +// already available ENR fields (i.e. in the case of websocket connections) const MultiaddrENRField = "multiaddrs" // WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/). type WakuEnrBitfield = uint8 +// NewWakuEnrBitfield creates a WakuEnrBitField whose value will depend on which protocols are enabled in the node func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield { var v uint8 = 0 @@ -45,6 +50,7 @@ func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield { return v } +// GetENRandIP returns a enr Node and TCP address obtained from a multiaddress. priv key and protocols supported func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.PrivateKey) (*enode.Node, *net.TCPAddr, error) { ip, err := addr.ValueForProtocol(ma.P_IP4) if err != nil { @@ -125,7 +131,8 @@ func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.Priv return node, tcpAddr, err } -func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { +// EnodeToMultiaddress converts an enode into a multiaddress +func enodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey()) peerID, err := peer.IDFromPublicKey(pubKey) if err != nil { @@ -135,6 +142,7 @@ func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) { return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID)) } +// Multiaddress is used to extract all the multiaddresses that are part of a ENR record func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) { pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey()) peerID, err := peer.IDFromPublicKey(pubKey) @@ -145,8 +153,8 @@ func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) { var multiaddrRaw []byte if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil { if enr.IsNotFound(err) { - Logger().Debug("Trying to convert enode to multiaddress, since I could not retrieve multiaddress field for node ", zap.Any("enode", node)) - addr, err := EnodeToMultiAddr(node) + Logger().Debug("trying to convert enode to multiaddress, since I could not retrieve multiaddress field for node ", zap.Any("enode", node)) + addr, err := enodeToMultiAddr(node) if err != nil { return nil, err } @@ -189,10 +197,16 @@ func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) { } func EnodeToPeerInfo(node *enode.Node) (*peer.AddrInfo, error) { - address, err := EnodeToMultiAddr(node) + addresses, err := Multiaddress(node) if err != nil { return nil, err } - return peer.AddrInfoFromP2pAddr(address) + res, err := peer.AddrInfosFromP2pAddrs(addresses...) + if err != nil { + return nil, err + } + + return &res[0], nil + } diff --git a/waku/v2/utils/enr_test.go b/waku/v2/utils/enr_test.go index bc8c5541..f2bee49b 100644 --- a/waku/v2/utils/enr_test.go +++ b/waku/v2/utils/enr_test.go @@ -19,7 +19,7 @@ func TestEnodeToMultiAddr(t *testing.T) { parsedNode := enode.MustParse(enr) expectedMultiAddr := "/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ" - actualMultiAddr, err := EnodeToMultiAddr(parsedNode) + actualMultiAddr, err := enodeToMultiAddr(parsedNode) require.NoError(t, err) require.Equal(t, expectedMultiAddr, actualMultiAddr.String()) } @@ -41,7 +41,7 @@ func TestGetENRandIP(t *testing.T) { require.Equal(t, hostAddr, resTCPAddr) parsedNode := enode.MustParse(node.String()) - resMultiaddress, err := EnodeToMultiAddr(parsedNode) + resMultiaddress, err := enodeToMultiAddr(parsedNode) require.NoError(t, err) require.Equal(t, ogMultiaddress.String(), resMultiaddress.String()) } diff --git a/waku/v2/utils/logger.go b/waku/v2/utils/logger.go index 40ed2704..2ee10c77 100644 --- a/waku/v2/utils/logger.go +++ b/waku/v2/utils/logger.go @@ -8,6 +8,7 @@ import ( var log *zap.Logger = nil var atom = zap.NewAtomicLevel() +// SetLogLevel sets a custom log level func SetLogLevel(level string) error { lvl := zapcore.InfoLevel // zero value err := lvl.Set(level) @@ -18,6 +19,7 @@ func SetLogLevel(level string) error { return nil } +// Logger creates a zap.Logger with some reasonable defaults func Logger() *zap.Logger { if log == nil { cfg := zap.Config{ diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 831a35c8..afce9f9f 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -13,8 +13,9 @@ import ( "go.uber.org/zap" ) +// ErrNoPeersAvailable is emitted when no suitable peers are found for +// some protocol var ErrNoPeersAvailable = errors.New("no suitable peers found") -var PingServiceNotAvailable = errors.New("ping service not available") // SelectPeer is used to return a random peer that supports a given protocol. func SelectPeer(host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) { @@ -50,6 +51,7 @@ type pingResult struct { rtt time.Duration } +// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) { var peers peer.IDSlice for _, peer := range host.Peerstore().Peers() { @@ -104,9 +106,9 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str } if min == nil { return nil, ErrNoPeersAvailable - } else { - return &min.p, nil } + + return &min.p, nil case <-ctx.Done(): return nil, ErrNoPeersAvailable } diff --git a/waku/v2/utils/time.go b/waku/v2/utils/time.go index 1634b3ed..12d3cd62 100644 --- a/waku/v2/utils/time.go +++ b/waku/v2/utils/time.go @@ -2,7 +2,7 @@ package utils import "time" -// GetUnixEpoch converts a time into a unix timestamp with nanoseconds +// GetUnixEpochFrom converts a time into a unix timestamp with nanoseconds func GetUnixEpochFrom(now time.Time) int64 { return now.UnixNano() }