fix: add topic to broadcast and some comments to functions (#229)

This commit is contained in:
Richard Ramos 2022-04-25 23:31:26 +04:00 committed by GitHub
parent f1f6cb04f0
commit bbab0f1714
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 213 additions and 103 deletions

View File

@ -295,5 +295,5 @@ func addPeer(wakuNode *node.WakuNode, addr string, protocol protocol.ID) (*peer.
return nil, err
}
return wakuNode.AddPeer(ma, protocol)
return wakuNode.AddPeer(ma, string(protocol))
}

View File

@ -82,7 +82,7 @@ func main() {
panic(err)
}
_, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], filter.FilterID_v20beta1)
_, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], string(filter.FilterID_v20beta1))
if err != nil {
log.Info("Error adding filter peer on light node ", err)
}

View File

@ -117,7 +117,7 @@ func waku_default_pubsub_topic() *C.char {
//export waku_set_event_callback
// Register callback to act as signal handler and receive application signal
// (in JSON) which are used o react to asyncronous events in waku. The function
// (in JSON) which are used o react to asynchronous events in waku. The function
// signature for the callback should be `void myCallback(char* signalJSON)`
func waku_set_event_callback(cb unsafe.Pointer) {
mobile.SetEventCallback(cb)

View File

@ -16,7 +16,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1"
"github.com/libp2p/go-libp2p-core/peer"
p2pproto "github.com/libp2p/go-libp2p-core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol"
@ -195,7 +194,7 @@ func AddPeer(address string, protocolID string) string {
return makeJSONResponse(err)
}
peerID, err := wakuNode.AddPeer(ma, p2pproto.ID(protocolID))
peerID, err := wakuNode.AddPeer(ma, protocolID)
return prepareJSONResponse(peerID, err)
}

View File

@ -89,7 +89,7 @@ func RelaySubscribe(topic string) string {
mutex.Lock()
defer mutex.Unlock()
subscription, ok := subscriptions[topicToSubscribe]
_, ok := subscriptions[topicToSubscribe]
if ok {
return makeJSONResponse(nil)
}
@ -101,11 +101,11 @@ func RelaySubscribe(topic string) string {
subscriptions[topicToSubscribe] = subscription
go func() {
go func(subscription *relay.Subscription) {
for envelope := range subscription.C {
send("message", toSubscriptionMessage(envelope))
}
}()
}(subscription)
return makeJSONResponse(nil)
}

View File

@ -28,10 +28,12 @@ func Logger() *zap.SugaredLogger {
return log
}
// GetHostAddress returns the first listen address used by a host
func GetHostAddress(ha host.Host) ma.Multiaddr {
return ha.Addrs()[0]
}
// FindFreePort returns an available port number
func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) {
t.Helper()
@ -61,6 +63,7 @@ func FindFreePort(t *testing.T, host string, maxAttempts int) (int, error) {
return 0, fmt.Errorf("no free port found")
}
// MakeHost creates a Libp2p host with a random key on a specific port
func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, error) {
// Creates a new RSA key pair for this host.
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, randomness)
@ -80,10 +83,12 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
)
}
// CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp
func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage {
return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp}
}
// RandomHex returns a random hex string of n bytes
func RandomHex(n int) (string, error) {
bytes := make([]byte, n)
if _, err := rand.Read(bytes); err != nil {

View File

@ -25,7 +25,6 @@ import (
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
@ -216,10 +215,10 @@ func Execute(options Options) {
failOnErr(err, "Wakunode")
addPeers(wakuNode, options.Rendezvous.Nodes.Value(), rendezvous.RendezvousID_v001)
addPeers(wakuNode, options.Store.Nodes.Value(), store.StoreID_v20beta4)
addPeers(wakuNode, options.LightPush.Nodes.Value(), lightpush.LightPushID_v20beta1)
addPeers(wakuNode, options.Filter.Nodes.Value(), filter.FilterID_v20beta1)
addPeers(wakuNode, options.Rendezvous.Nodes.Value(), string(rendezvous.RendezvousID_v001))
addPeers(wakuNode, options.Store.Nodes.Value(), string(store.StoreID_v20beta4))
addPeers(wakuNode, options.LightPush.Nodes.Value(), string(lightpush.LightPushID_v20beta1))
addPeers(wakuNode, options.Filter.Nodes.Value(), string(filter.FilterID_v20beta1))
if err = wakuNode.Start(); err != nil {
utils.Logger().Fatal(fmt.Errorf("could not start waku node, %w", err).Error())
@ -237,9 +236,10 @@ func Execute(options Options) {
if options.Relay.Enable {
for _, nodeTopic := range options.Relay.Topics.Value() {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
failOnErr(err, "Error subscring to topic")
wakuNode.Broadcaster().Unregister(sub.C)
wakuNode.Broadcaster().Unregister(&nodeTopic, sub.C)
}
}
@ -307,7 +307,7 @@ func Execute(options Options) {
}
}
func addPeers(wakuNode *node.WakuNode, addresses []string, protocol protocol.ID) {
func addPeers(wakuNode *node.WakuNode, addresses []string, protocols ...string) {
for _, addrString := range addresses {
if addrString == "" {
continue
@ -316,7 +316,7 @@ func addPeers(wakuNode *node.WakuNode, addresses []string, protocol protocol.ID)
addr, err := multiaddr.NewMultiaddr(addrString)
failOnErr(err, "invalid multiaddress")
_, err = wakuNode.AddPeer(addr, protocol)
_, err = wakuNode.AddPeer(addr, protocols...)
failOnErr(err, "error adding peer")
}
}

View File

@ -7,10 +7,12 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
// RendezVousLevelDB is a struct used to hold a reference to a LevelDB database
type RendezVousLevelDB struct {
db *leveldb.DB
}
// NewRendezVousLevelDB opens a LevelDB database to be used for rendezvous protocol
func NewRendezVousLevelDB(dBPath string) (*RendezVousLevelDB, error) {
db, err := leveldb.OpenFile(dBPath, &opt.Options{OpenFilesCacheCapacity: 3})
@ -21,14 +23,18 @@ func NewRendezVousLevelDB(dBPath string) (*RendezVousLevelDB, error) {
return &RendezVousLevelDB{db}, nil
}
// Delete removes a key from the database
func (r *RendezVousLevelDB) Delete(key []byte) error {
return r.db.Delete(key, nil)
}
// Put inserts or updates a key in the database
func (r *RendezVousLevelDB) Put(key []byte, value []byte) error {
return r.db.Put(key, value, nil)
}
// NewIterator returns an interator that can be used to iterate over all
// the records contained in the DB
func (r *RendezVousLevelDB) NewIterator(prefix []byte) rendezvous.Iterator {
return r.db.NewIterator(util.BytesPrefix(prefix), nil)
}

View File

@ -10,29 +10,33 @@ import (
type doneCh chan struct{}
type chOperation struct {
ch chan<- *protocol.Envelope
done doneCh
ch chan<- *protocol.Envelope
topic *string
done doneCh
}
type broadcastOutputs map[chan<- *protocol.Envelope]struct{}
type broadcaster struct {
input chan *protocol.Envelope
reg chan chOperation
unreg chan chOperation
outputs map[chan<- *protocol.Envelope]bool
outputs broadcastOutputs
outputsPerTopic map[string]broadcastOutputs
}
// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {
// Register a new channel to receive broadcasts
Register(chan<- *protocol.Envelope)
// Register a new channel to receive broadcasts and return a channel to wait until this operation is complete
WaitRegister(newch chan<- *protocol.Envelope) doneCh
// Unregister a channel so that it no longer receives broadcasts.
Unregister(chan<- *protocol.Envelope)
// Register a new channel to receive broadcasts from a pubsubtopic
Register(topic *string, newch chan<- *protocol.Envelope)
// Register a new channel to receive broadcasts from a pubsub topic and return a channel to wait until this operation is complete
WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh
// Unregister a channel so that it no longer receives broadcasts from a pubsub topic
Unregister(topic *string, newch chan<- *protocol.Envelope)
// Unregister a subscriptor channel and return a channel to wait until this operation is done
WaitUnregister(newch chan<- *protocol.Envelope) doneCh
WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh
// Shut this broadcaster down.
Close()
// Submit a new object to all subscribers
@ -43,6 +47,15 @@ func (b *broadcaster) broadcast(m *protocol.Envelope) {
for ch := range b.outputs {
ch <- m
}
outputs, ok := b.outputsPerTopic[m.PubsubTopic()]
if !ok {
return
}
for ch := range outputs {
ch <- m
}
}
func (b *broadcaster) run() {
@ -52,7 +65,18 @@ func (b *broadcaster) run() {
b.broadcast(m)
case broadcastee, ok := <-b.reg:
if ok {
b.outputs[broadcastee.ch] = true
if broadcastee.topic != nil {
topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic]
if !ok {
b.outputsPerTopic[*broadcastee.topic] = make(broadcastOutputs)
topicOutputs = b.outputsPerTopic[*broadcastee.topic]
}
topicOutputs[broadcastee.ch] = struct{}{}
b.outputsPerTopic[*broadcastee.topic] = topicOutputs
} else {
b.outputs[broadcastee.ch] = struct{}{}
}
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
@ -63,7 +87,17 @@ func (b *broadcaster) run() {
return
}
case broadcastee := <-b.unreg:
delete(b.outputs, broadcastee.ch)
if broadcastee.topic != nil {
topicOutputs, ok := b.outputsPerTopic[*broadcastee.topic]
if !ok {
continue
}
delete(topicOutputs, broadcastee.ch)
b.outputsPerTopic[*broadcastee.topic] = topicOutputs
} else {
delete(b.outputs, broadcastee.ch)
}
if broadcastee.done != nil {
broadcastee.done <- struct{}{}
}
@ -76,10 +110,11 @@ func (b *broadcaster) run() {
// an Envelope containing a WakuMessage
func NewBroadcaster(buflen int) Broadcaster {
b := &broadcaster{
input: make(chan *protocol.Envelope, buflen),
reg: make(chan chOperation),
unreg: make(chan chOperation),
outputs: make(map[chan<- *protocol.Envelope]bool),
input: make(chan *protocol.Envelope, buflen),
reg: make(chan chOperation),
unreg: make(chan chOperation),
outputs: make(broadcastOutputs),
outputsPerTopic: make(map[string]broadcastOutputs),
}
go b.run()
@ -88,38 +123,42 @@ func NewBroadcaster(buflen int) Broadcaster {
}
// Register a subscriptor channel and return a channel to wait until this operation is done
func (b *broadcaster) WaitRegister(newch chan<- *protocol.Envelope) doneCh {
func (b *broadcaster) WaitRegister(topic *string, newch chan<- *protocol.Envelope) doneCh {
d := make(doneCh)
b.reg <- chOperation{
ch: newch,
done: d,
ch: newch,
topic: topic,
done: d,
}
return d
}
// Register a subscriptor channel
func (b *broadcaster) Register(newch chan<- *protocol.Envelope) {
func (b *broadcaster) Register(topic *string, newch chan<- *protocol.Envelope) {
b.reg <- chOperation{
ch: newch,
done: nil,
ch: newch,
topic: topic,
done: nil,
}
}
// Unregister a subscriptor channel and return a channel to wait until this operation is done
func (b *broadcaster) WaitUnregister(newch chan<- *protocol.Envelope) doneCh {
func (b *broadcaster) WaitUnregister(topic *string, newch chan<- *protocol.Envelope) doneCh {
d := make(doneCh)
b.unreg <- chOperation{
ch: newch,
done: d,
ch: newch,
topic: topic,
done: d,
}
return d
}
// Unregister a subscriptor channel
func (b *broadcaster) Unregister(newch chan<- *protocol.Envelope) {
func (b *broadcaster) Unregister(topic *string, newch chan<- *protocol.Envelope) {
b.unreg <- chOperation{
ch: newch,
done: nil,
ch: newch,
topic: topic,
done: nil,
}
}

View File

@ -9,7 +9,6 @@ import (
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120
// by Dustin Sallings (c) 2013, which was released under MIT license
func TestBroadcast(t *testing.T) {
wg := sync.WaitGroup{}
@ -20,12 +19,10 @@ func TestBroadcast(t *testing.T) {
wg.Add(1)
cch := make(chan *protocol.Envelope)
b.Register(cch)
b.Register(nil, cch)
go func() {
defer wg.Done()
defer b.Unregister(cch)
defer b.Unregister(nil, cch)
<-cch
}()
@ -47,13 +44,13 @@ func TestBroadcastWait(t *testing.T) {
wg.Add(1)
cch := make(chan *protocol.Envelope)
<-b.WaitRegister(cch)
<-b.WaitRegister(nil, cch)
go func() {
defer wg.Done()
<-cch
<-b.WaitUnregister(cch)
<-b.WaitUnregister(nil, cch)
}()
}
@ -66,6 +63,7 @@ func TestBroadcastWait(t *testing.T) {
func TestBroadcastCleanup(t *testing.T) {
b := NewBroadcaster(100)
b.Register(make(chan *protocol.Envelope))
topic := "test"
b.Register(&topic, make(chan *protocol.Envelope))
b.Close()
}

View File

@ -46,7 +46,7 @@ type DiscoveryV5 struct {
// Used for those weird cases where updateAddress
// receives the same external address twice both with the original port
// and the nat port. Ideally this atribute should be removed by doing
// and the nat port. Ideally this attribute should be removed by doing
// hole punching before starting waku
ogTCPPort int
}

View File

@ -9,25 +9,24 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
type DnsDiscoveryParameters struct {
type dnsDiscoveryParameters struct {
nameserver string
}
type DnsDiscoveryOption func(*DnsDiscoveryParameters)
type DnsDiscoveryOption func(*dnsDiscoveryParameters)
// WithMultiaddress is a WakuNodeOption that configures libp2p to listen on a list of multiaddresses
func WithNameserver(nameserver string) DnsDiscoveryOption {
return func(params *DnsDiscoveryParameters) {
return func(params *dnsDiscoveryParameters) {
params.nameserver = nameserver
}
}
// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable
// ENR tree
// RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable ENR tree
func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) ([]ma.Multiaddr, error) {
var multiAddrs []ma.Multiaddr
params := new(DnsDiscoveryParameters)
params := new(dnsDiscoveryParameters)
for _, opt := range opts {
opt(params)
}
@ -42,12 +41,12 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption)
}
for _, node := range tree.Nodes() {
m, err := utils.EnodeToMultiAddr(node)
m, err := utils.Multiaddress(node)
if err != nil {
return nil, err
}
multiAddrs = append(multiAddrs, m)
multiAddrs = append(multiAddrs, m...)
}
return multiAddrs, nil

View File

@ -17,15 +17,19 @@ import (
"go.uber.org/zap"
)
// A map of peer IDs to supported protocols
// PeerStatis is a map of peer IDs to supported protocols
type PeerStats map[peer.ID][]string
// ConnStatus is used to indicate if the node is online, has access to history
// and also see the list of peers the node is aware of
type ConnStatus struct {
IsOnline bool
HasHistory bool
Peers PeerStats
}
// ConnectionNotifier is a custom Notifier to be used to display when a peer
// connects or disconnects to the node
type ConnectionNotifier struct {
h host.Host
ctx context.Context
@ -44,35 +48,36 @@ func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.SugaredLog
}
}
// Listen is called when network starts listening on an addr
func (c ConnectionNotifier) Listen(n network.Network, m ma.Multiaddr) {
// called when network starts listening on an addr
}
// ListenClose is called when network stops listening on an address
func (c ConnectionNotifier) ListenClose(n network.Network, m ma.Multiaddr) {
// called when network stops listening on an addr
}
// Connected is called when a connection is opened
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
// called when a connection opened
c.log.Info(fmt.Sprintf("Peer %s connected", cc.RemotePeer()))
stats.Record(c.ctx, metrics.Peers.M(1))
}
// Disconnected is called when a connection closed
func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) {
// called when a connection closed
c.log.Info(fmt.Sprintf("Peer %s disconnected", cc.RemotePeer()))
stats.Record(c.ctx, metrics.Peers.M(-1))
c.DisconnectChan <- cc.RemotePeer()
}
// OpenedStream is called when a stream opened
func (c ConnectionNotifier) OpenedStream(n network.Network, s network.Stream) {
// called when a stream opened
}
// ClosedStream is called when a stream closed
func (c ConnectionNotifier) ClosedStream(n network.Network, s network.Stream) {
// called when a stream closed
}
// Close quits the ConnectionNotifier
func (c ConnectionNotifier) Close() {
close(c.quit)
}
@ -101,6 +106,8 @@ func (w *WakuNode) connectednessListener() {
}
}
// Status returns the current status of the node (online or not)
// and if the node has access to history nodes or not
func (w *WakuNode) Status() (isOnline bool, hasHistory bool) {
hasRelay := false
hasLightPush := false

View File

@ -17,6 +17,7 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/pb"
)
// KeyKind indicates the type of encryption to apply
type KeyKind string
const (
@ -25,14 +26,14 @@ const (
None KeyKind = "None"
)
// The message to encode
// Payload contains the data of the message to encode
type Payload struct {
Data []byte // Raw message payload
Padding []byte // Used to align data size, since data size alone might reveal important metainformation.
Key *KeyInfo // Contains the type of encryption to apply and the private key to use for signing the message
}
// The decoded payload of a received message.
// DecodedPayload contains the data of the received message after decrypting it
type DecodedPayload struct {
Data []byte // Decoded message payload
Padding []byte // Used to align data size, since data size alone might reveal important metainformation.

View File

@ -18,7 +18,6 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
p2pproto "github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
ws "github.com/libp2p/go-ws-transport"
ma "github.com/multiformats/go-multiaddr"
@ -90,6 +89,7 @@ func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
}
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
params := new(WakuNodeParameters)
@ -279,6 +279,7 @@ func (w *WakuNode) checkForAddressChanges() {
}
}
// Start initializes all the protocols that were setup in the WakuNode
func (w *WakuNode) Start() error {
w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{
swap.WithMode(w.opts.swapMode),
@ -336,17 +337,18 @@ func (w *WakuNode) Start() error {
// Subscribe store to topic
if w.opts.storeMsgs {
w.log.Info("Subscribing store to broadcaster")
w.bcaster.Register(w.store.MessageChannel())
w.bcaster.Register(nil, w.store.MessageChannel())
}
if w.filter != nil {
w.log.Info("Subscribing filter to broadcaster")
w.bcaster.Register(w.filter.MsgC)
w.bcaster.Register(nil, w.filter.MsgC)
}
return nil
}
// Stop stops the WakuNode and closess all connections to the host
func (w *WakuNode) Stop() {
defer w.cancel()
@ -377,14 +379,17 @@ func (w *WakuNode) Stop() {
w.wg.Wait()
}
// Host returns the libp2p Host used by the WakuNode
func (w *WakuNode) Host() host.Host {
return w.host
}
// ID returns the base58 encoded ID from the host
func (w *WakuNode) ID() string {
return w.host.ID().Pretty()
}
// ListenAddresses returns all the multiaddresses used by the host
func (w *WakuNode) ListenAddresses() []ma.Multiaddr {
hostInfo, _ := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", w.host.ID().Pretty()))
var result []ma.Multiaddr
@ -394,30 +399,39 @@ func (w *WakuNode) ListenAddresses() []ma.Multiaddr {
return result
}
// Relay is used to access any operation related to Waku Relay protocol
func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
}
// Store is used to access any operation related to Waku Store protocol
func (w *WakuNode) Store() store.Store {
return w.store
}
// Filter is used to access any operation related to Waku Filter protocol
func (w *WakuNode) Filter() *filter.WakuFilter {
return w.filter
}
// Lightpush is used to access any operation related to Waku Lightpush protocol
func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
return w.lightPush
}
// DiscV5 is used to access any operation related to DiscoveryV5
func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 {
return w.discoveryV5
}
// Broadcaster is used to access the message broadcaster that is used to push
// messages to different protocols
func (w *WakuNode) Broadcaster() v2.Broadcaster {
return w.bcaster
}
// Publish will attempt to publish a message via WakuRelay if there are enough
// peers available, otherwise it will attempt to publish via Lightpush protocol
func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
if !w.opts.enableLightPush && !w.opts.enableRelay {
return errors.New("cannot publish message, relay and lightpush are disabled")
@ -456,7 +470,7 @@ func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option)
if err != nil {
return err
}
w.Broadcaster().Unregister(sub.C)
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
}
// TODO: rlnRelay
@ -532,10 +546,10 @@ func (w *WakuNode) startStore() {
}
}
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error {
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error {
w.log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty()))
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
err := w.host.Peerstore().AddProtocols(info.ID, string(protocolID))
err := w.host.Peerstore().AddProtocols(info.ID, protocols...)
if err != nil {
return err
}
@ -543,15 +557,17 @@ func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error {
return nil
}
func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.ID, error) {
// AddPeer is used to add a peer and the protocols it support to the node peerstore
func (w *WakuNode) AddPeer(address ma.Multiaddr, protocols ...string) (*peer.ID, error) {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return nil, err
}
return &info.ID, w.addPeer(info, protocolID)
return &info.ID, w.addPeer(info, protocols...)
}
// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress
func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
@ -561,6 +577,7 @@ func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Mult
return w.connect(ctx, *info)
}
// DialPeer is used to connect to a peer using a string containing a multiaddress
func (w *WakuNode) DialPeer(ctx context.Context, address string) error {
p, err := ma.NewMultiaddr(address)
if err != nil {
@ -585,11 +602,13 @@ func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
return nil
}
// DialPeerByID is used to connect to an already known peer
func (w *WakuNode) DialPeerByID(ctx context.Context, peerID peer.ID) error {
info := w.host.Peerstore().PeerInfo(peerID)
return w.connect(ctx, info)
}
// ClosePeerByAddress is used to disconnect from a peer using its multiaddress
func (w *WakuNode) ClosePeerByAddress(address string) error {
p, err := ma.NewMultiaddr(address)
if err != nil {
@ -605,6 +624,7 @@ func (w *WakuNode) ClosePeerByAddress(address string) error {
return w.ClosePeerById(info.ID)
}
// ClosePeerById is used to close a connection to a peer
func (w *WakuNode) ClosePeerById(id peer.ID) error {
err := w.host.Network().ClosePeer(id)
if err != nil {
@ -613,10 +633,12 @@ func (w *WakuNode) ClosePeerById(id peer.ID) error {
return nil
}
// PeerCount return the number of connected peers
func (w *WakuNode) PeerCount() int {
return len(w.host.Network().Peers())
}
// PeerStats returns a list of peers and the protocols supported by them
func (w *WakuNode) PeerStats() PeerStats {
p := make(PeerStats)
for _, peerID := range w.host.Network().Peers() {
@ -629,6 +651,7 @@ func (w *WakuNode) PeerStats() PeerStats {
return p
}
// Peers return the list of peers, addresses, protocols supported and connection status
func (w *WakuNode) Peers() ([]*Peer, error) {
var peers []*Peer
for _, peerId := range w.host.Peerstore().Peers() {

View File

@ -103,6 +103,7 @@ func (w WakuNodeParameters) Identity() config.Option {
return libp2p.Identity(*w.GetPrivKey())
}
// AddressFactory returns the address factory used by the node's host
func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory {
return w.addressFactory
}
@ -173,6 +174,7 @@ func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption {
}
}
// GetPrivKey returns the private key used in the node
func (w *WakuNodeParameters) GetPrivKey() *crypto.PrivKey {
privKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(w.privKey))
return &privKey
@ -259,6 +261,8 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
}
}
// WithWakuStoreFactory is used to replace the default WakuStore with a custom
// implementation that implements the store.Store interface
func WithWakuStoreFactory(factory storeFactory) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.storeFactory = factory
@ -326,6 +330,7 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
}
}
// WithWebsockets is a WakuNodeOption used to enable websockets support
func WithWebsockets(address string, port int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableWS = true
@ -341,6 +346,7 @@ func WithWebsockets(address string, port int) WakuNodeOption {
}
}
// WithSecureWebsockets is a WakuNodeOption used to enable secure websockets support
func WithSecureWebsockets(address string, port int, certPath string, keyPath string) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableWSS = true

View File

@ -69,7 +69,7 @@ func TestWakuFilter(t *testing.T) {
defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, true, tests.Logger())
broadcaster.Register(node2Filter.MsgC)
broadcaster.Register(&testTopic, node2Filter.MsgC)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1))
@ -154,7 +154,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
defer sub2.Unsubscribe()
node2Filter, _ := NewWakuFilter(ctx, host2, true, tests.Logger(), WithTimeout(3*time.Second))
broadcaster.Register(node2Filter.MsgC)
broadcaster.Register(&testTopic, node2Filter.MsgC)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1))

View File

@ -228,7 +228,7 @@ func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscr
w.subscriptions[topic] = append(w.subscriptions[topic], subscription)
if w.bcaster != nil {
w.bcaster.Register(subscription.C)
w.bcaster.Register(&topic, subscription.C)
}
go w.subscribeToTopic(topic, subscription, sub)
@ -300,7 +300,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *
for {
select {
case <-subscription.quit:
func() {
func(topic string) {
subscription.Lock()
defer subscription.Unlock()
@ -309,11 +309,11 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *
}
subscription.closed = true
if w.bcaster != nil {
<-w.bcaster.WaitUnregister(subscription.C) // Remove from broadcast list
<-w.bcaster.WaitUnregister(&topic, subscription.C) // Remove from broadcast list
}
close(subscription.C)
}()
}(t)
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg := <-subChannel:
if msg == nil {

View File

@ -34,10 +34,19 @@ const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4")
const MaxPageSize = 100
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
// ErrNoPeersAvailable is returned when there are no store peers in the peer store
// that could be used to retrieve message history
ErrNoPeersAvailable = errors.New("no suitable remote peers")
// ErrInvalidId is returned when no RequestID is given
ErrInvalidId = errors.New("invalid request id")
// ErrFailedToResumeHistory is returned when the node attempted to retrieve historic
// messages to fill its own message history but for some reason it failed
ErrFailedToResumeHistory = errors.New("failed to resume the history")
ErrFailedQuery = errors.New("failed to resolve the query")
// ErrFailedQuery is emitted when the query fails to return results
ErrFailedQuery = errors.New("failed to resolve the query")
)
func minOf(vars ...int) int {

View File

@ -46,11 +46,11 @@ func (s *WakuSwap) sendCheque(peerId string) {
func (s *WakuSwap) applyPolicy(peerId string) {
if s.Accounting[peerId] <= s.params.disconnectThreshold {
s.log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId])
s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId])
}
if s.Accounting[peerId] >= s.params.paymentThreshold {
s.log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId])
s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId])
if s.params.mode != HardMode {
s.sendCheque(peerId)
}

View File

@ -58,7 +58,7 @@ func TestFilterSubscription(t *testing.T) {
break
}
_, err = d.node.AddPeer(addr, filter.FilterID_v20beta1)
_, err = d.node.AddPeer(addr, string(filter.FilterID_v20beta1))
require.NoError(t, err)
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []pb.ContentFilter{{ContentTopic: "ct"}}}

View File

@ -24,7 +24,7 @@ func newRunnerService(broadcaster v2.Broadcaster, adder Adder) *runnerService {
func (r *runnerService) Start() {
r.ch = make(chan *protocol.Envelope, 1024)
r.broadcaster.Register(r.ch)
r.broadcaster.Register(nil, r.ch)
for {
select {
@ -38,6 +38,6 @@ func (r *runnerService) Start() {
func (r *runnerService) Stop() {
r.quit <- true
r.broadcaster.Unregister(r.ch)
r.broadcaster.Unregister(nil, r.ch)
close(r.ch)
}

View File

@ -17,12 +17,17 @@ import (
"go.uber.org/zap"
)
// WakuENRField is the name of the ENR field that contains information about which protocols are supported by the node
const WakuENRField = "waku2"
// MultiaddrENRField is the name of the ENR field that will contain multiaddresses that cannot be described using the
// already available ENR fields (i.e. in the case of websocket connections)
const MultiaddrENRField = "multiaddrs"
// WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/).
type WakuEnrBitfield = uint8
// NewWakuEnrBitfield creates a WakuEnrBitField whose value will depend on which protocols are enabled in the node
func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
var v uint8 = 0
@ -45,6 +50,7 @@ func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
return v
}
// GetENRandIP returns a enr Node and TCP address obtained from a multiaddress. priv key and protocols supported
func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.PrivateKey) (*enode.Node, *net.TCPAddr, error) {
ip, err := addr.ValueForProtocol(ma.P_IP4)
if err != nil {
@ -125,7 +131,8 @@ func GetENRandIP(addr ma.Multiaddr, wakuFlags WakuEnrBitfield, privK *ecdsa.Priv
return node, tcpAddr, err
}
func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
// EnodeToMultiaddress converts an enode into a multiaddress
func enodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey())
peerID, err := peer.IDFromPublicKey(pubKey)
if err != nil {
@ -135,6 +142,7 @@ func EnodeToMultiAddr(node *enode.Node) (ma.Multiaddr, error) {
return ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", node.IP(), node.TCP(), peerID))
}
// Multiaddress is used to extract all the multiaddresses that are part of a ENR record
func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) {
pubKey := (*crypto.Secp256k1PublicKey)(node.Pubkey())
peerID, err := peer.IDFromPublicKey(pubKey)
@ -145,8 +153,8 @@ func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) {
var multiaddrRaw []byte
if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil {
if enr.IsNotFound(err) {
Logger().Debug("Trying to convert enode to multiaddress, since I could not retrieve multiaddress field for node ", zap.Any("enode", node))
addr, err := EnodeToMultiAddr(node)
Logger().Debug("trying to convert enode to multiaddress, since I could not retrieve multiaddress field for node ", zap.Any("enode", node))
addr, err := enodeToMultiAddr(node)
if err != nil {
return nil, err
}
@ -189,10 +197,16 @@ func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) {
}
func EnodeToPeerInfo(node *enode.Node) (*peer.AddrInfo, error) {
address, err := EnodeToMultiAddr(node)
addresses, err := Multiaddress(node)
if err != nil {
return nil, err
}
return peer.AddrInfoFromP2pAddr(address)
res, err := peer.AddrInfosFromP2pAddrs(addresses...)
if err != nil {
return nil, err
}
return &res[0], nil
}

View File

@ -19,7 +19,7 @@ func TestEnodeToMultiAddr(t *testing.T) {
parsedNode := enode.MustParse(enr)
expectedMultiAddr := "/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ"
actualMultiAddr, err := EnodeToMultiAddr(parsedNode)
actualMultiAddr, err := enodeToMultiAddr(parsedNode)
require.NoError(t, err)
require.Equal(t, expectedMultiAddr, actualMultiAddr.String())
}
@ -41,7 +41,7 @@ func TestGetENRandIP(t *testing.T) {
require.Equal(t, hostAddr, resTCPAddr)
parsedNode := enode.MustParse(node.String())
resMultiaddress, err := EnodeToMultiAddr(parsedNode)
resMultiaddress, err := enodeToMultiAddr(parsedNode)
require.NoError(t, err)
require.Equal(t, ogMultiaddress.String(), resMultiaddress.String())
}

View File

@ -8,6 +8,7 @@ import (
var log *zap.Logger = nil
var atom = zap.NewAtomicLevel()
// SetLogLevel sets a custom log level
func SetLogLevel(level string) error {
lvl := zapcore.InfoLevel // zero value
err := lvl.Set(level)
@ -18,6 +19,7 @@ func SetLogLevel(level string) error {
return nil
}
// Logger creates a zap.Logger with some reasonable defaults
func Logger() *zap.Logger {
if log == nil {
cfg := zap.Config{

View File

@ -13,8 +13,9 @@ import (
"go.uber.org/zap"
)
// ErrNoPeersAvailable is emitted when no suitable peers are found for
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")
var PingServiceNotAvailable = errors.New("ping service not available")
// SelectPeer is used to return a random peer that supports a given protocol.
func SelectPeer(host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) {
@ -50,6 +51,7 @@ type pingResult struct {
rtt time.Duration
}
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) {
var peers peer.IDSlice
for _, peer := range host.Peerstore().Peers() {
@ -104,9 +106,9 @@ func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId str
}
if min == nil {
return nil, ErrNoPeersAvailable
} else {
return &min.p, nil
}
return &min.p, nil
case <-ctx.Done():
return nil, ErrNoPeersAvailable
}

View File

@ -2,7 +2,7 @@ package utils
import "time"
// GetUnixEpoch converts a time into a unix timestamp with nanoseconds
// GetUnixEpochFrom converts a time into a unix timestamp with nanoseconds
func GetUnixEpochFrom(now time.Time) int64 {
return now.UnixNano()
}