diff --git a/go.mod b/go.mod index 91dd9b21c..2f3806b4d 100644 --- a/go.mod +++ b/go.mod @@ -81,7 +81,7 @@ require ( github.com/ladydascalie/currency v1.6.0 github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 github.com/schollz/peerdiscovery v1.7.0 - github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8 + github.com/waku-org/go-waku v0.6.0 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 go.uber.org/multierr v1.8.0 diff --git a/go.sum b/go.sum index 7545fee6e..469129e6c 100644 --- a/go.sum +++ b/go.sum @@ -2102,8 +2102,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM= github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= -github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8 h1:1agRxCtCBoCaMB/72L87bZgyvCAEMUoBL6l0MImpV2Y= -github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8/go.mod h1:6AXlCiXueZC7XbvG1LUi0uEOMS2n/30h2kjXzW8zfYY= +github.com/waku-org/go-waku v0.6.0 h1:0ycNr7fBz4qDxqVGXFxI9GQCT/yfyaiSmbemWFPrQnw= +github.com/waku-org/go-waku v0.6.0/go.mod h1:6AXlCiXueZC7XbvG1LUi0uEOMS2n/30h2kjXzW8zfYY= github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU= github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go index ab1dd6060..857fcfd75 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/discv5/discover.go @@ -294,8 +294,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error { } if len(peerAddrs) != 0 { - d.peerConnector.PeerChannel() <- peerAddrs[0] + select { + case d.peerConnector.PeerChannel() <- peerAddrs[0]: + case <-ctx.Done(): + return nil + } } + select { case <-ctx.Done(): return nil diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/dnsdisc/enr.go b/vendor/github.com/waku-org/go-waku/waku/v2/dnsdisc/enr.go index ac32ef387..7129a4639 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/dnsdisc/enr.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/dnsdisc/enr.go @@ -9,8 +9,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/metrics" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" - - ma "github.com/multiformats/go-multiaddr" ) type dnsDiscoveryParameters struct { @@ -27,9 +25,9 @@ func WithNameserver(nameserver string) DnsDiscoveryOption { } type DiscoveredNode struct { - PeerID peer.ID - Addresses []ma.Multiaddr - ENR *enode.Node + PeerID peer.ID + PeerInfo peer.AddrInfo + ENR *enode.Node } // RetrieveNodes returns a list of multiaddress given a url to a DNS discoverable ENR tree @@ -58,9 +56,22 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption) return nil, err } + infoAddr, err := peer.AddrInfosFromP2pAddrs(m...) + if err != nil { + return nil, err + } + + var info peer.AddrInfo + for _, i := range infoAddr { + if i.ID == peerID { + info = i + break + } + } + d := DiscoveredNode{ - PeerID: peerID, - Addresses: m, + PeerID: peerID, + PeerInfo: info, } if hasUDP(node) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go index d71812046..5a58cf0c2 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/connectedness.go @@ -29,23 +29,30 @@ type ConnStatus struct { Peers PeerStats } +type PeerConnection struct { + PeerID peer.ID + Connected bool +} + // ConnectionNotifier is a custom Notifier to be used to display when a peer // connects or disconnects to the node type ConnectionNotifier struct { h host.Host ctx context.Context log *zap.Logger + connNotifCh chan<- PeerConnection DisconnectChan chan peer.ID quit chan struct{} } -func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.Logger) ConnectionNotifier { +func NewConnectionNotifier(ctx context.Context, h host.Host, connNotifCh chan<- PeerConnection, log *zap.Logger) ConnectionNotifier { return ConnectionNotifier{ h: h, ctx: ctx, DisconnectChan: make(chan peer.ID, 100), + connNotifCh: connNotifCh, quit: make(chan struct{}), - log: log, + log: log.Named("connection-notifier"), } } @@ -60,6 +67,13 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr // Connected is called when a connection is opened func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer())) + if c.connNotifCh != nil { + select { + case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}: + default: + c.log.Warn("subscriber is too slow") + } + } stats.Record(c.ctx, metrics.Peers.M(1)) } @@ -68,6 +82,13 @@ func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) { c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(-1)) c.DisconnectChan <- cc.RemotePeer() + if c.connNotifCh != nil { + select { + case c.connNotifCh <- PeerConnection{cc.RemotePeer(), false}: + default: + c.log.Warn("subscriber is too slow") + } + } } // OpenedStream is called when a stream opened diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go index 5cb6a8e97..dd9fb7f43 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/localnode.go @@ -2,14 +2,9 @@ package node import ( "context" - "crypto/ecdsa" - "encoding/binary" "errors" - "math" - "math/rand" "net" "strconv" - "time" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" @@ -18,71 +13,25 @@ import ( "go.uber.org/zap" ) -func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) { - db, err := enode.OpenDB("") - if err != nil { - return nil, err - } - return enode.NewLocalNode(db, priv), nil -} - -func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []ma.Multiaddr) (err error) { - defer func() { - if e := recover(); e != nil { - // Deleting the multiaddr entry, as we could not write it succesfully - localnode.Delete(enr.WithEntry(wenr.MultiaddrENRField, struct{}{})) - err = errors.New("could not write enr record") - } - }() - - var fieldRaw []byte - for _, addr := range addrAggr { - maRaw := addr.Bytes() - maSize := make([]byte, 2) - binary.BigEndian.PutUint16(maSize, uint16(len(maRaw))) - - fieldRaw = append(fieldRaw, maSize...) - fieldRaw = append(fieldRaw, maRaw...) - } - - if len(fieldRaw) != 0 && len(fieldRaw) <= 100 { // Max length for multiaddr field before triggering the 300 bytes limit - localnode.Set(enr.WithEntry(wenr.MultiaddrENRField, fieldRaw)) - } - - // This is to trigger the signing record err due to exceeding 300bytes limit - _ = localnode.Node() - - return nil -} - func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool, log *zap.Logger) error { - localnode.SetFallbackUDP(int(udpPort)) - localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags)) - localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) - - if udpPort > math.MaxUint16 { - return errors.New("invalid udp port number") - } + var options []wenr.ENROption + options = append(options, wenr.WithUDPPort(udpPort)) + options = append(options, wenr.WithWakuBitfield(wakuFlags)) + options = append(options, wenr.WithMultiaddress(multiaddrs...)) if advertiseAddr != nil { // An advertised address disables libp2p address updates // and discv5 predictions - ipAddr, err := selectMostExternalAddress(advertiseAddr) if err != nil { return err } - localnode.SetStaticIP(ipAddr.IP) - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6? - - return writeMultiaddresses(localnode, multiaddrs) + options = append(options, wenr.WithIP(ipAddr)) } else if !shouldAutoUpdate { // We received a libp2p address update. Autoupdate is disabled // Using a static ip will disable endpoint prediction. - localnode.SetStaticIP(ipAddr.IP) - localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6? - return writeMultiaddresses(localnode, multiaddrs) + options = append(options, wenr.WithIP(ipAddr)) } else { // We received a libp2p address update, but we should still // allow discv5 to update the enr record. We set the localnode @@ -105,42 +54,9 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M localnode.Delete(enr.IPv6{}) localnode.Delete(enr.TCP6(0)) } - - return writeMultiaddresses(localnode, multiaddrs) } -} - -func writeMultiaddresses(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr) error { - // Randomly shuffle multiaddresses - rand.Seed(time.Now().UnixNano()) - rand.Shuffle(len(multiaddrs), func(i, j int) { multiaddrs[i], multiaddrs[j] = multiaddrs[j], multiaddrs[i] }) - - // Adding extra multiaddresses. Should probably not exceed the enr max size of 300bytes - var err error - failedOnceWritingENR := false - couldWriteENRatLeastOnce := false - successIdx := -1 - for i := len(multiaddrs) - 1; i >= 0; i-- { - err = writeMultiaddressField(localnode, multiaddrs[0:i]) - if err == nil { - couldWriteENRatLeastOnce = true - successIdx = i - break - } else { - failedOnceWritingENR = true - } - } - - if failedOnceWritingENR && couldWriteENRatLeastOnce { - // Could write a subset of multiaddresses but not all - err = writeMultiaddressField(localnode, multiaddrs[0:successIdx]) - if err != nil { - return errors.New("could not write new ENR") - } - } - - return nil + return wenr.Update(localnode, options...) } func isPrivate(addr *net.TCPAddr) bool { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 2b4f493ad..025156604 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -109,7 +109,7 @@ type WakuNode struct { // Channel passed to WakuNode constructor // receiving connection status notifications - connStatusChan chan ConnStatus + connStatusChan chan<- ConnStatus storeFactory storeFactory } @@ -184,7 +184,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.timesource = timesource.NewDefaultClock() } - w.localNode, err = w.newLocalnode(w.opts.privKey) + w.localNode, err = enr.NewLocalnode(w.opts.privKey) if err != nil { w.log.Error("creating localnode", zap.Error(err)) } @@ -300,7 +300,7 @@ func (w *WakuNode) Start(ctx context.Context) error { return err } - w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log) + w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.opts.connNotifCh, w.log) w.host.Network().Notify(w.connectionNotif) w.enrChangeCh = make(chan struct{}, 10) @@ -722,6 +722,11 @@ func (w *WakuNode) DialPeer(ctx context.Context, address string) error { return w.connect(ctx, *info) } +// DialPeerWithInfo is used to connect to a peer using its address information +func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) error { + return w.connect(ctx, peerInfo) +} + func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 6222e953f..d126c4c35 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -112,7 +112,8 @@ type WakuNodeParameters struct { enableLightPush bool - connStatusC chan ConnStatus + connStatusC chan<- ConnStatus + connNotifCh chan<- PeerConnection storeFactory storeFactory } @@ -433,6 +434,13 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption { } } +func WithConnectionNotification(ch chan<- PeerConnection) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.connNotifCh = ch + return nil + } +} + // WithWebsockets is a WakuNodeOption used to enable websockets support func WithWebsockets(address string, port int) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/localnode.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/localnode.go new file mode 100644 index 000000000..e7a92dfe2 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/localnode.go @@ -0,0 +1,131 @@ +package enr + +import ( + "crypto/ecdsa" + "encoding/binary" + "errors" + "math" + "math/rand" + "net" + "time" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/multiformats/go-multiaddr" +) + +func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) { + db, err := enode.OpenDB("") + if err != nil { + return nil, err + } + return enode.NewLocalNode(db, priv), nil +} + +type ENROption func(*enode.LocalNode) error + +func WithMultiaddress(multiaddrs ...multiaddr.Multiaddr) ENROption { + return func(localnode *enode.LocalNode) (err error) { + + // Randomly shuffle multiaddresses + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(multiaddrs), func(i, j int) { multiaddrs[i], multiaddrs[j] = multiaddrs[j], multiaddrs[i] }) + + // Adding extra multiaddresses. Should probably not exceed the enr max size of 300bytes + failedOnceWritingENR := false + couldWriteENRatLeastOnce := false + successIdx := -1 + for i := len(multiaddrs) - 1; i >= 0; i-- { + err = writeMultiaddressField(localnode, multiaddrs[0:i]) + if err == nil { + couldWriteENRatLeastOnce = true + successIdx = i + break + } else { + failedOnceWritingENR = true + } + } + + if failedOnceWritingENR && couldWriteENRatLeastOnce { + // Could write a subset of multiaddresses but not all + err = writeMultiaddressField(localnode, multiaddrs[0:successIdx]) + if err != nil { + return errors.New("could not write new ENR") + } + } + + return nil + } +} + +func WithCapabilities(lightpush, filter, store, relay bool) ENROption { + return func(localnode *enode.LocalNode) (err error) { + wakuflags := NewWakuEnrBitfield(lightpush, filter, store, relay) + return WithWakuBitfield(wakuflags)(localnode) + } +} + +func WithWakuBitfield(flags WakuEnrBitfield) ENROption { + return func(localnode *enode.LocalNode) (err error) { + localnode.Set(enr.WithEntry(WakuENRField, flags)) + return nil + } +} + +func WithIP(ipAddr *net.TCPAddr) ENROption { + return func(localnode *enode.LocalNode) (err error) { + localnode.SetStaticIP(ipAddr.IP) + localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6? + return nil + } +} + +func WithUDPPort(udpPort uint) ENROption { + return func(localnode *enode.LocalNode) (err error) { + if udpPort > math.MaxUint16 { + return errors.New("invalid udp port number") + } + localnode.SetFallbackUDP(int(udpPort)) + return nil + } +} + +func Update(localnode *enode.LocalNode, enrOptions ...ENROption) error { + localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) + for _, opt := range enrOptions { + err := opt(localnode) + if err != nil { + return err + } + } + return nil +} + +func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []multiaddr.Multiaddr) (err error) { + defer func() { + if e := recover(); e != nil { + // Deleting the multiaddr entry, as we could not write it succesfully + localnode.Delete(enr.WithEntry(MultiaddrENRField, struct{}{})) + err = errors.New("could not write enr record") + } + }() + + var fieldRaw []byte + for _, addr := range addrAggr { + maRaw := addr.Bytes() + maSize := make([]byte, 2) + binary.BigEndian.PutUint16(maSize, uint16(len(maRaw))) + + fieldRaw = append(fieldRaw, maSize...) + fieldRaw = append(fieldRaw, maRaw...) + } + + if len(fieldRaw) != 0 && len(fieldRaw) <= 100 { // Max length for multiaddr field before triggering the 300 bytes limit + localnode.Set(enr.WithEntry(MultiaddrENRField, fieldRaw)) + } + + // This is to trigger the signing record err due to exceeding 300bytes limit + _ = localnode.Node() + + return nil +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/shards.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/shards.go index 09930522b..bec90b7a3 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/shards.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/shards.go @@ -6,25 +6,31 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" ) -func SetWakuRelayShardingIndicesList(localnode *enode.LocalNode, rs protocol.RelayShards) error { - value, err := rs.IndicesList() - if err != nil { - return err +func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption { + return func(localnode *enode.LocalNode) error { + value, err := rs.IndicesList() + if err != nil { + return err + } + localnode.Set(enr.WithEntry(ShardingIndicesListEnrField, value)) + return nil } - localnode.Set(enr.WithEntry(ShardingIndicesListEnrField, value)) - return nil } -func SetWakuRelayShardingBitVector(localnode *enode.LocalNode, rs protocol.RelayShards) error { - localnode.Set(enr.WithEntry(ShardingBitVectorEnrField, rs.BitVector())) - return nil +func WithWakuRelayShardingBitVector(rs protocol.RelayShards) ENROption { + return func(localnode *enode.LocalNode) error { + localnode.Set(enr.WithEntry(ShardingBitVectorEnrField, rs.BitVector())) + return nil + } } -func SetWakuRelaySharding(localnode *enode.LocalNode, rs protocol.RelayShards) error { - if len(rs.Indices) >= 64 { - return SetWakuRelayShardingBitVector(localnode, rs) - } else { - return SetWakuRelayShardingIndicesList(localnode, rs) +func WithtWakuRelaySharding(rs protocol.RelayShards) ENROption { + return func(localnode *enode.LocalNode) error { + if len(rs.Indices) >= 64 { + return WithWakuRelayShardingBitVector(rs)(localnode) + } else { + return WithWakuRelayShardingIndicesList(rs)(localnode) + } } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/validators.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/validators.go index 6e99fd364..418b6f4d0 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/validators.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/relay/validators.go @@ -1,19 +1,20 @@ package relay import ( + "bytes" "context" "crypto/ecdsa" - "crypto/elliptic" "encoding/binary" - "encoding/hex" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/hash" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" @@ -54,8 +55,8 @@ func withinTimeWindow(t timesource.Timesource, msg *pb.WakuMessage) bool { type validatorFn = func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool -func validatorFnBuilder(t timesource.Timesource, topic string, publicKey *ecdsa.PublicKey) validatorFn { - pubkBytes := crypto.FromECDSAPub(publicKey) +func validatorFnBuilder(t timesource.Timesource, address common.Address) (validatorFn, error) { + topic := protocol.NewNamedShardingPubsubTopic(address.String() + "/proto").String() return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool { msg := new(pb.WakuMessage) err := proto.Unmarshal(message.Data, msg) @@ -70,13 +71,26 @@ func validatorFnBuilder(t timesource.Timesource, topic string, publicKey *ecdsa. msgHash := MsgHash(topic, msg) signature := msg.Meta - return secp256k1.VerifySignature(pubkBytes, msgHash, signature) - } + pubKey, err := crypto.SigToPub(msgHash, signature) + if err != nil { + return false + } + + msgAddress := crypto.PubkeyToAddress(*pubKey) + + return bytes.Equal(msgAddress.Bytes(), address.Bytes()) + }, nil } -func (w *WakuRelay) AddSignedTopicValidator(topic string, publicKey *ecdsa.PublicKey) error { - w.log.Info("adding validator to signed topic", zap.String("topic", topic), zap.String("publicKey", hex.EncodeToString(elliptic.Marshal(publicKey.Curve, publicKey.X, publicKey.Y)))) - err := w.pubsub.RegisterTopicValidator(topic, validatorFnBuilder(w.timesource, topic, publicKey)) +func (w *WakuRelay) AddSignedTopicValidator(topic string, address common.Address) error { + w.log.Info("adding validator to signed topic", zap.String("topic", topic), zap.String("address", address.String())) + + fn, err := validatorFnBuilder(w.timesource, address) + if err != nil { + return err + } + + err = w.pubsub.RegisterTopicValidator(topic, fn) if err != nil { return err } @@ -88,13 +102,19 @@ func (w *WakuRelay) AddSignedTopicValidator(topic string, publicKey *ecdsa.Publi return nil } -func SignMessage(privKey *ecdsa.PrivateKey, topic string, msg *pb.WakuMessage) error { +func SignMessage(privKey *ecdsa.PrivateKey, msg *pb.WakuMessage) error { + topic := PrivKeyToTopic(privKey) msgHash := MsgHash(topic, msg) sign, err := secp256k1.Sign(msgHash, crypto.FromECDSA(privKey)) if err != nil { return err } - msg.Meta = sign[0:64] // Drop the V in R||S||V + msg.Meta = sign return nil } + +func PrivKeyToTopic(privKey *ecdsa.PrivateKey) string { + address := crypto.PubkeyToAddress(privKey.PublicKey) + return protocol.NewNamedShardingPubsubTopic(address.String() + "/proto").String() +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go b/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go index 72bae0d83..d0adea479 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/rendezvous/rendezvous.go @@ -124,7 +124,11 @@ func (r *Rendezvous) discover(ctx context.Context) { server.Unlock() for _, addr := range addrInfo { - r.peerConnector.PeerChannel() <- addr + select { + case r.peerConnector.PeerChannel() <- addr: + case <-ctx.Done(): + return + } } } else { // TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query diff --git a/vendor/modules.txt b/vendor/modules.txt index d40808787..db4b1441b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -975,7 +975,7 @@ github.com/vacp2p/mvds/transport github.com/waku-org/go-discover/discover github.com/waku-org/go-discover/discover/v4wire github.com/waku-org/go-discover/discover/v5wire -# github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8 +# github.com/waku-org/go-waku v0.6.0 ## explicit; go 1.19 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/waku/persistence diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 8585c91fa..0b2ef8f91 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -403,11 +403,11 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error { fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) { - if len(d.Addresses) != 0 { + if len(d.PeerInfo.Addrs) != 0 { go func(ma multiaddr.Multiaddr) { w.identifyAndConnect(ctx, w.settings.LightClient, ma) wg.Done() - }(d.Addresses[0]) + }(d.PeerInfo.Addrs[0]) } } @@ -564,14 +564,14 @@ func (w *Waku) runPeerExchangeLoop() { var withThesePeers []peer.ID for _, record := range w.dnsAddressCache { for _, discoveredNode := range record { - if len(discoveredNode.Addresses) == 0 { + if len(discoveredNode.PeerInfo.Addrs) == 0 { continue } // Obtaining peer ID - peerIDString, err := discoveredNode.Addresses[0].ValueForProtocol(multiaddr.P_P2P) + peerIDString, err := discoveredNode.PeerInfo.Addrs[0].ValueForProtocol(multiaddr.P_P2P) if err != nil { - w.logger.Warn("multiaddress does not contain peerID", zap.String("multiaddr", discoveredNode.Addresses[0].String())) + w.logger.Warn("multiaddress does not contain peerID", zap.String("multiaddr", discoveredNode.PeerInfo.Addrs[0].String())) continue // No peer ID available somehow }