update go-waku v0.6.0
This commit is contained in:
parent
62e3e9bd62
commit
efb1036429
2
go.mod
2
go.mod
|
@ -81,7 +81,7 @@ require (
|
||||||
github.com/ladydascalie/currency v1.6.0
|
github.com/ladydascalie/currency v1.6.0
|
||||||
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
|
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
|
||||||
github.com/schollz/peerdiscovery v1.7.0
|
github.com/schollz/peerdiscovery v1.7.0
|
||||||
github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8
|
github.com/waku-org/go-waku v0.6.0
|
||||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||||
go.uber.org/multierr v1.8.0
|
go.uber.org/multierr v1.8.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -2102,8 +2102,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
|
||||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
|
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 h1:xwY0kW5XZFimdqfZb9cZwT1S3VJP9j3AE6bdNd9boXM=
|
||||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
|
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
|
||||||
github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8 h1:1agRxCtCBoCaMB/72L87bZgyvCAEMUoBL6l0MImpV2Y=
|
github.com/waku-org/go-waku v0.6.0 h1:0ycNr7fBz4qDxqVGXFxI9GQCT/yfyaiSmbemWFPrQnw=
|
||||||
github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8/go.mod h1:6AXlCiXueZC7XbvG1LUi0uEOMS2n/30h2kjXzW8zfYY=
|
github.com/waku-org/go-waku v0.6.0/go.mod h1:6AXlCiXueZC7XbvG1LUi0uEOMS2n/30h2kjXzW8zfYY=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU=
|
github.com/waku-org/go-zerokit-rln v0.1.12 h1:66+tU6sTlmUpuUlEv7kCFOGZ37MwZYFJBXHcm8QquwU=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac=
|
github.com/waku-org/go-zerokit-rln v0.1.12/go.mod h1:MUW+wB6Yj7UBMdZrhko7oHfUZeY2wchggXYjpUiMoac=
|
||||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs=
|
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230331231302-258cacb91327 h1:Q5XQqo+PEmvrybT8D7BEsKCwIYDi80s+00Q49cfm9Gs=
|
||||||
|
|
|
@ -294,8 +294,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(peerAddrs) != 0 {
|
if len(peerAddrs) != 0 {
|
||||||
d.peerConnector.PeerChannel() <- peerAddrs[0]
|
select {
|
||||||
|
case d.peerConnector.PeerChannel() <- peerAddrs[0]:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -9,8 +9,6 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
|
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type dnsDiscoveryParameters struct {
|
type dnsDiscoveryParameters struct {
|
||||||
|
@ -28,7 +26,7 @@ func WithNameserver(nameserver string) DnsDiscoveryOption {
|
||||||
|
|
||||||
type DiscoveredNode struct {
|
type DiscoveredNode struct {
|
||||||
PeerID peer.ID
|
PeerID peer.ID
|
||||||
Addresses []ma.Multiaddr
|
PeerInfo peer.AddrInfo
|
||||||
ENR *enode.Node
|
ENR *enode.Node
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,9 +56,22 @@ func RetrieveNodes(ctx context.Context, url string, opts ...DnsDiscoveryOption)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
infoAddr, err := peer.AddrInfosFromP2pAddrs(m...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var info peer.AddrInfo
|
||||||
|
for _, i := range infoAddr {
|
||||||
|
if i.ID == peerID {
|
||||||
|
info = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
d := DiscoveredNode{
|
d := DiscoveredNode{
|
||||||
PeerID: peerID,
|
PeerID: peerID,
|
||||||
Addresses: m,
|
PeerInfo: info,
|
||||||
}
|
}
|
||||||
|
|
||||||
if hasUDP(node) {
|
if hasUDP(node) {
|
||||||
|
|
|
@ -29,23 +29,30 @@ type ConnStatus struct {
|
||||||
Peers PeerStats
|
Peers PeerStats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PeerConnection struct {
|
||||||
|
PeerID peer.ID
|
||||||
|
Connected bool
|
||||||
|
}
|
||||||
|
|
||||||
// ConnectionNotifier is a custom Notifier to be used to display when a peer
|
// ConnectionNotifier is a custom Notifier to be used to display when a peer
|
||||||
// connects or disconnects to the node
|
// connects or disconnects to the node
|
||||||
type ConnectionNotifier struct {
|
type ConnectionNotifier struct {
|
||||||
h host.Host
|
h host.Host
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
connNotifCh chan<- PeerConnection
|
||||||
DisconnectChan chan peer.ID
|
DisconnectChan chan peer.ID
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.Logger) ConnectionNotifier {
|
func NewConnectionNotifier(ctx context.Context, h host.Host, connNotifCh chan<- PeerConnection, log *zap.Logger) ConnectionNotifier {
|
||||||
return ConnectionNotifier{
|
return ConnectionNotifier{
|
||||||
h: h,
|
h: h,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
DisconnectChan: make(chan peer.ID, 100),
|
DisconnectChan: make(chan peer.ID, 100),
|
||||||
|
connNotifCh: connNotifCh,
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
log: log,
|
log: log.Named("connection-notifier"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,6 +67,13 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr
|
||||||
// Connected is called when a connection is opened
|
// Connected is called when a connection is opened
|
||||||
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
|
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
|
||||||
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()))
|
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()))
|
||||||
|
if c.connNotifCh != nil {
|
||||||
|
select {
|
||||||
|
case c.connNotifCh <- PeerConnection{cc.RemotePeer(), true}:
|
||||||
|
default:
|
||||||
|
c.log.Warn("subscriber is too slow")
|
||||||
|
}
|
||||||
|
}
|
||||||
stats.Record(c.ctx, metrics.Peers.M(1))
|
stats.Record(c.ctx, metrics.Peers.M(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,6 +82,13 @@ func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) {
|
||||||
c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer()))
|
c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer()))
|
||||||
stats.Record(c.ctx, metrics.Peers.M(-1))
|
stats.Record(c.ctx, metrics.Peers.M(-1))
|
||||||
c.DisconnectChan <- cc.RemotePeer()
|
c.DisconnectChan <- cc.RemotePeer()
|
||||||
|
if c.connNotifCh != nil {
|
||||||
|
select {
|
||||||
|
case c.connNotifCh <- PeerConnection{cc.RemotePeer(), false}:
|
||||||
|
default:
|
||||||
|
c.log.Warn("subscriber is too slow")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenedStream is called when a stream opened
|
// OpenedStream is called when a stream opened
|
||||||
|
|
|
@ -2,14 +2,9 @@ package node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
"errors"
|
||||||
"math"
|
|
||||||
"math/rand"
|
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||||
|
@ -18,71 +13,25 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) {
|
|
||||||
db, err := enode.OpenDB("")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return enode.NewLocalNode(db, priv), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []ma.Multiaddr) (err error) {
|
|
||||||
defer func() {
|
|
||||||
if e := recover(); e != nil {
|
|
||||||
// Deleting the multiaddr entry, as we could not write it succesfully
|
|
||||||
localnode.Delete(enr.WithEntry(wenr.MultiaddrENRField, struct{}{}))
|
|
||||||
err = errors.New("could not write enr record")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var fieldRaw []byte
|
|
||||||
for _, addr := range addrAggr {
|
|
||||||
maRaw := addr.Bytes()
|
|
||||||
maSize := make([]byte, 2)
|
|
||||||
binary.BigEndian.PutUint16(maSize, uint16(len(maRaw)))
|
|
||||||
|
|
||||||
fieldRaw = append(fieldRaw, maSize...)
|
|
||||||
fieldRaw = append(fieldRaw, maRaw...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(fieldRaw) != 0 && len(fieldRaw) <= 100 { // Max length for multiaddr field before triggering the 300 bytes limit
|
|
||||||
localnode.Set(enr.WithEntry(wenr.MultiaddrENRField, fieldRaw))
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is to trigger the signing record err due to exceeding 300bytes limit
|
|
||||||
_ = localnode.Node()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool, log *zap.Logger) error {
|
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort uint, wakuFlags wenr.WakuEnrBitfield, advertiseAddr []ma.Multiaddr, shouldAutoUpdate bool, log *zap.Logger) error {
|
||||||
localnode.SetFallbackUDP(int(udpPort))
|
var options []wenr.ENROption
|
||||||
localnode.Set(enr.WithEntry(wenr.WakuENRField, wakuFlags))
|
options = append(options, wenr.WithUDPPort(udpPort))
|
||||||
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
|
options = append(options, wenr.WithWakuBitfield(wakuFlags))
|
||||||
|
options = append(options, wenr.WithMultiaddress(multiaddrs...))
|
||||||
if udpPort > math.MaxUint16 {
|
|
||||||
return errors.New("invalid udp port number")
|
|
||||||
}
|
|
||||||
|
|
||||||
if advertiseAddr != nil {
|
if advertiseAddr != nil {
|
||||||
// An advertised address disables libp2p address updates
|
// An advertised address disables libp2p address updates
|
||||||
// and discv5 predictions
|
// and discv5 predictions
|
||||||
|
|
||||||
ipAddr, err := selectMostExternalAddress(advertiseAddr)
|
ipAddr, err := selectMostExternalAddress(advertiseAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
localnode.SetStaticIP(ipAddr.IP)
|
options = append(options, wenr.WithIP(ipAddr))
|
||||||
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
|
|
||||||
|
|
||||||
return writeMultiaddresses(localnode, multiaddrs)
|
|
||||||
} else if !shouldAutoUpdate {
|
} else if !shouldAutoUpdate {
|
||||||
// We received a libp2p address update. Autoupdate is disabled
|
// We received a libp2p address update. Autoupdate is disabled
|
||||||
// Using a static ip will disable endpoint prediction.
|
// Using a static ip will disable endpoint prediction.
|
||||||
localnode.SetStaticIP(ipAddr.IP)
|
options = append(options, wenr.WithIP(ipAddr))
|
||||||
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
|
|
||||||
return writeMultiaddresses(localnode, multiaddrs)
|
|
||||||
} else {
|
} else {
|
||||||
// We received a libp2p address update, but we should still
|
// We received a libp2p address update, but we should still
|
||||||
// allow discv5 to update the enr record. We set the localnode
|
// allow discv5 to update the enr record. We set the localnode
|
||||||
|
@ -105,42 +54,9 @@ func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, multiaddrs []ma.M
|
||||||
localnode.Delete(enr.IPv6{})
|
localnode.Delete(enr.IPv6{})
|
||||||
localnode.Delete(enr.TCP6(0))
|
localnode.Delete(enr.TCP6(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
return writeMultiaddresses(localnode, multiaddrs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
return wenr.Update(localnode, options...)
|
||||||
|
|
||||||
func writeMultiaddresses(localnode *enode.LocalNode, multiaddrs []ma.Multiaddr) error {
|
|
||||||
// Randomly shuffle multiaddresses
|
|
||||||
rand.Seed(time.Now().UnixNano())
|
|
||||||
rand.Shuffle(len(multiaddrs), func(i, j int) { multiaddrs[i], multiaddrs[j] = multiaddrs[j], multiaddrs[i] })
|
|
||||||
|
|
||||||
// Adding extra multiaddresses. Should probably not exceed the enr max size of 300bytes
|
|
||||||
var err error
|
|
||||||
failedOnceWritingENR := false
|
|
||||||
couldWriteENRatLeastOnce := false
|
|
||||||
successIdx := -1
|
|
||||||
for i := len(multiaddrs) - 1; i >= 0; i-- {
|
|
||||||
err = writeMultiaddressField(localnode, multiaddrs[0:i])
|
|
||||||
if err == nil {
|
|
||||||
couldWriteENRatLeastOnce = true
|
|
||||||
successIdx = i
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
failedOnceWritingENR = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if failedOnceWritingENR && couldWriteENRatLeastOnce {
|
|
||||||
// Could write a subset of multiaddresses but not all
|
|
||||||
err = writeMultiaddressField(localnode, multiaddrs[0:successIdx])
|
|
||||||
if err != nil {
|
|
||||||
return errors.New("could not write new ENR")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func isPrivate(addr *net.TCPAddr) bool {
|
func isPrivate(addr *net.TCPAddr) bool {
|
||||||
|
|
|
@ -109,7 +109,7 @@ type WakuNode struct {
|
||||||
|
|
||||||
// Channel passed to WakuNode constructor
|
// Channel passed to WakuNode constructor
|
||||||
// receiving connection status notifications
|
// receiving connection status notifications
|
||||||
connStatusChan chan ConnStatus
|
connStatusChan chan<- ConnStatus
|
||||||
|
|
||||||
storeFactory storeFactory
|
storeFactory storeFactory
|
||||||
}
|
}
|
||||||
|
@ -184,7 +184,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
w.timesource = timesource.NewDefaultClock()
|
w.timesource = timesource.NewDefaultClock()
|
||||||
}
|
}
|
||||||
|
|
||||||
w.localNode, err = w.newLocalnode(w.opts.privKey)
|
w.localNode, err = enr.NewLocalnode(w.opts.privKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.log.Error("creating localnode", zap.Error(err))
|
w.log.Error("creating localnode", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -300,7 +300,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log)
|
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.opts.connNotifCh, w.log)
|
||||||
w.host.Network().Notify(w.connectionNotif)
|
w.host.Network().Notify(w.connectionNotif)
|
||||||
|
|
||||||
w.enrChangeCh = make(chan struct{}, 10)
|
w.enrChangeCh = make(chan struct{}, 10)
|
||||||
|
@ -722,6 +722,11 @@ func (w *WakuNode) DialPeer(ctx context.Context, address string) error {
|
||||||
return w.connect(ctx, *info)
|
return w.connect(ctx, *info)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DialPeerWithInfo is used to connect to a peer using its address information
|
||||||
|
func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) error {
|
||||||
|
return w.connect(ctx, peerInfo)
|
||||||
|
}
|
||||||
|
|
||||||
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
||||||
err := w.host.Connect(ctx, info)
|
err := w.host.Connect(ctx, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -112,7 +112,8 @@ type WakuNodeParameters struct {
|
||||||
|
|
||||||
enableLightPush bool
|
enableLightPush bool
|
||||||
|
|
||||||
connStatusC chan ConnStatus
|
connStatusC chan<- ConnStatus
|
||||||
|
connNotifCh chan<- PeerConnection
|
||||||
|
|
||||||
storeFactory storeFactory
|
storeFactory storeFactory
|
||||||
}
|
}
|
||||||
|
@ -433,6 +434,13 @@ func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithConnectionNotification(ch chan<- PeerConnection) WakuNodeOption {
|
||||||
|
return func(params *WakuNodeParameters) error {
|
||||||
|
params.connNotifCh = ch
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithWebsockets is a WakuNodeOption used to enable websockets support
|
// WithWebsockets is a WakuNodeOption used to enable websockets support
|
||||||
func WithWebsockets(address string, port int) WakuNodeOption {
|
func WithWebsockets(address string, port int) WakuNodeOption {
|
||||||
return func(params *WakuNodeParameters) error {
|
return func(params *WakuNodeParameters) error {
|
||||||
|
|
131
vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/localnode.go
generated
vendored
Normal file
131
vendor/github.com/waku-org/go-waku/waku/v2/protocol/enr/localnode.go
generated
vendored
Normal file
|
@ -0,0 +1,131 @@
|
||||||
|
package enr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewLocalnode(priv *ecdsa.PrivateKey) (*enode.LocalNode, error) {
|
||||||
|
db, err := enode.OpenDB("")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return enode.NewLocalNode(db, priv), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ENROption func(*enode.LocalNode) error
|
||||||
|
|
||||||
|
func WithMultiaddress(multiaddrs ...multiaddr.Multiaddr) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) (err error) {
|
||||||
|
|
||||||
|
// Randomly shuffle multiaddresses
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
rand.Shuffle(len(multiaddrs), func(i, j int) { multiaddrs[i], multiaddrs[j] = multiaddrs[j], multiaddrs[i] })
|
||||||
|
|
||||||
|
// Adding extra multiaddresses. Should probably not exceed the enr max size of 300bytes
|
||||||
|
failedOnceWritingENR := false
|
||||||
|
couldWriteENRatLeastOnce := false
|
||||||
|
successIdx := -1
|
||||||
|
for i := len(multiaddrs) - 1; i >= 0; i-- {
|
||||||
|
err = writeMultiaddressField(localnode, multiaddrs[0:i])
|
||||||
|
if err == nil {
|
||||||
|
couldWriteENRatLeastOnce = true
|
||||||
|
successIdx = i
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
failedOnceWritingENR = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if failedOnceWritingENR && couldWriteENRatLeastOnce {
|
||||||
|
// Could write a subset of multiaddresses but not all
|
||||||
|
err = writeMultiaddressField(localnode, multiaddrs[0:successIdx])
|
||||||
|
if err != nil {
|
||||||
|
return errors.New("could not write new ENR")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCapabilities(lightpush, filter, store, relay bool) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) (err error) {
|
||||||
|
wakuflags := NewWakuEnrBitfield(lightpush, filter, store, relay)
|
||||||
|
return WithWakuBitfield(wakuflags)(localnode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithWakuBitfield(flags WakuEnrBitfield) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) (err error) {
|
||||||
|
localnode.Set(enr.WithEntry(WakuENRField, flags))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithIP(ipAddr *net.TCPAddr) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) (err error) {
|
||||||
|
localnode.SetStaticIP(ipAddr.IP)
|
||||||
|
localnode.Set(enr.TCP(uint16(ipAddr.Port))) // TODO: ipv6?
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithUDPPort(udpPort uint) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) (err error) {
|
||||||
|
if udpPort > math.MaxUint16 {
|
||||||
|
return errors.New("invalid udp port number")
|
||||||
|
}
|
||||||
|
localnode.SetFallbackUDP(int(udpPort))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Update(localnode *enode.LocalNode, enrOptions ...ENROption) error {
|
||||||
|
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
|
||||||
|
for _, opt := range enrOptions {
|
||||||
|
err := opt(localnode)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeMultiaddressField(localnode *enode.LocalNode, addrAggr []multiaddr.Multiaddr) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if e := recover(); e != nil {
|
||||||
|
// Deleting the multiaddr entry, as we could not write it succesfully
|
||||||
|
localnode.Delete(enr.WithEntry(MultiaddrENRField, struct{}{}))
|
||||||
|
err = errors.New("could not write enr record")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var fieldRaw []byte
|
||||||
|
for _, addr := range addrAggr {
|
||||||
|
maRaw := addr.Bytes()
|
||||||
|
maSize := make([]byte, 2)
|
||||||
|
binary.BigEndian.PutUint16(maSize, uint16(len(maRaw)))
|
||||||
|
|
||||||
|
fieldRaw = append(fieldRaw, maSize...)
|
||||||
|
fieldRaw = append(fieldRaw, maRaw...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(fieldRaw) != 0 && len(fieldRaw) <= 100 { // Max length for multiaddr field before triggering the 300 bytes limit
|
||||||
|
localnode.Set(enr.WithEntry(MultiaddrENRField, fieldRaw))
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is to trigger the signing record err due to exceeding 300bytes limit
|
||||||
|
_ = localnode.Node()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -6,7 +6,8 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
func SetWakuRelayShardingIndicesList(localnode *enode.LocalNode, rs protocol.RelayShards) error {
|
func WithWakuRelayShardingIndicesList(rs protocol.RelayShards) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) error {
|
||||||
value, err := rs.IndicesList()
|
value, err := rs.IndicesList()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -14,17 +15,22 @@ func SetWakuRelayShardingIndicesList(localnode *enode.LocalNode, rs protocol.Rel
|
||||||
localnode.Set(enr.WithEntry(ShardingIndicesListEnrField, value))
|
localnode.Set(enr.WithEntry(ShardingIndicesListEnrField, value))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func SetWakuRelayShardingBitVector(localnode *enode.LocalNode, rs protocol.RelayShards) error {
|
func WithWakuRelayShardingBitVector(rs protocol.RelayShards) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) error {
|
||||||
localnode.Set(enr.WithEntry(ShardingBitVectorEnrField, rs.BitVector()))
|
localnode.Set(enr.WithEntry(ShardingBitVectorEnrField, rs.BitVector()))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func SetWakuRelaySharding(localnode *enode.LocalNode, rs protocol.RelayShards) error {
|
func WithtWakuRelaySharding(rs protocol.RelayShards) ENROption {
|
||||||
|
return func(localnode *enode.LocalNode) error {
|
||||||
if len(rs.Indices) >= 64 {
|
if len(rs.Indices) >= 64 {
|
||||||
return SetWakuRelayShardingBitVector(localnode, rs)
|
return WithWakuRelayShardingBitVector(rs)(localnode)
|
||||||
} else {
|
} else {
|
||||||
return SetWakuRelayShardingIndicesList(localnode, rs)
|
return WithWakuRelayShardingIndicesList(rs)(localnode)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,19 +1,20 @@
|
||||||
package relay
|
package relay
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/elliptic"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/crypto/secp256k1"
|
"github.com/ethereum/go-ethereum/crypto/secp256k1"
|
||||||
|
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/hash"
|
"github.com/waku-org/go-waku/waku/v2/hash"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -54,8 +55,8 @@ func withinTimeWindow(t timesource.Timesource, msg *pb.WakuMessage) bool {
|
||||||
|
|
||||||
type validatorFn = func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool
|
type validatorFn = func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool
|
||||||
|
|
||||||
func validatorFnBuilder(t timesource.Timesource, topic string, publicKey *ecdsa.PublicKey) validatorFn {
|
func validatorFnBuilder(t timesource.Timesource, address common.Address) (validatorFn, error) {
|
||||||
pubkBytes := crypto.FromECDSAPub(publicKey)
|
topic := protocol.NewNamedShardingPubsubTopic(address.String() + "/proto").String()
|
||||||
return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
|
return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
|
||||||
msg := new(pb.WakuMessage)
|
msg := new(pb.WakuMessage)
|
||||||
err := proto.Unmarshal(message.Data, msg)
|
err := proto.Unmarshal(message.Data, msg)
|
||||||
|
@ -70,13 +71,26 @@ func validatorFnBuilder(t timesource.Timesource, topic string, publicKey *ecdsa.
|
||||||
msgHash := MsgHash(topic, msg)
|
msgHash := MsgHash(topic, msg)
|
||||||
signature := msg.Meta
|
signature := msg.Meta
|
||||||
|
|
||||||
return secp256k1.VerifySignature(pubkBytes, msgHash, signature)
|
pubKey, err := crypto.SigToPub(msgHash, signature)
|
||||||
}
|
if err != nil {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) AddSignedTopicValidator(topic string, publicKey *ecdsa.PublicKey) error {
|
msgAddress := crypto.PubkeyToAddress(*pubKey)
|
||||||
w.log.Info("adding validator to signed topic", zap.String("topic", topic), zap.String("publicKey", hex.EncodeToString(elliptic.Marshal(publicKey.Curve, publicKey.X, publicKey.Y))))
|
|
||||||
err := w.pubsub.RegisterTopicValidator(topic, validatorFnBuilder(w.timesource, topic, publicKey))
|
return bytes.Equal(msgAddress.Bytes(), address.Bytes())
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WakuRelay) AddSignedTopicValidator(topic string, address common.Address) error {
|
||||||
|
w.log.Info("adding validator to signed topic", zap.String("topic", topic), zap.String("address", address.String()))
|
||||||
|
|
||||||
|
fn, err := validatorFnBuilder(w.timesource, address)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = w.pubsub.RegisterTopicValidator(topic, fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -88,13 +102,19 @@ func (w *WakuRelay) AddSignedTopicValidator(topic string, publicKey *ecdsa.Publi
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func SignMessage(privKey *ecdsa.PrivateKey, topic string, msg *pb.WakuMessage) error {
|
func SignMessage(privKey *ecdsa.PrivateKey, msg *pb.WakuMessage) error {
|
||||||
|
topic := PrivKeyToTopic(privKey)
|
||||||
msgHash := MsgHash(topic, msg)
|
msgHash := MsgHash(topic, msg)
|
||||||
sign, err := secp256k1.Sign(msgHash, crypto.FromECDSA(privKey))
|
sign, err := secp256k1.Sign(msgHash, crypto.FromECDSA(privKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
msg.Meta = sign[0:64] // Drop the V in R||S||V
|
msg.Meta = sign
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func PrivKeyToTopic(privKey *ecdsa.PrivateKey) string {
|
||||||
|
address := crypto.PubkeyToAddress(privKey.PublicKey)
|
||||||
|
return protocol.NewNamedShardingPubsubTopic(address.String() + "/proto").String()
|
||||||
|
}
|
||||||
|
|
|
@ -124,7 +124,11 @@ func (r *Rendezvous) discover(ctx context.Context) {
|
||||||
server.Unlock()
|
server.Unlock()
|
||||||
|
|
||||||
for _, addr := range addrInfo {
|
for _, addr := range addrInfo {
|
||||||
r.peerConnector.PeerChannel() <- addr
|
select {
|
||||||
|
case r.peerConnector.PeerChannel() <- addr:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query
|
// TODO: change log level to DEBUG in go-libp2p-rendezvous@v0.4.1/svc.go:234 discover query
|
||||||
|
|
|
@ -975,7 +975,7 @@ github.com/vacp2p/mvds/transport
|
||||||
github.com/waku-org/go-discover/discover
|
github.com/waku-org/go-discover/discover
|
||||||
github.com/waku-org/go-discover/discover/v4wire
|
github.com/waku-org/go-discover/discover/v4wire
|
||||||
github.com/waku-org/go-discover/discover/v5wire
|
github.com/waku-org/go-discover/discover/v5wire
|
||||||
# github.com/waku-org/go-waku v0.5.3-0.20230509204224-d9a12bf079a8
|
# github.com/waku-org/go-waku v0.6.0
|
||||||
## explicit; go 1.19
|
## explicit; go 1.19
|
||||||
github.com/waku-org/go-waku/logging
|
github.com/waku-org/go-waku/logging
|
||||||
github.com/waku-org/go-waku/waku/persistence
|
github.com/waku-org/go-waku/waku/persistence
|
||||||
|
|
|
@ -403,11 +403,11 @@ func (w *Waku) dnsDiscover(ctx context.Context, enrtreeAddress string, apply fnA
|
||||||
|
|
||||||
func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error {
|
func (w *Waku) addWakuV2Peers(ctx context.Context, cfg *Config) error {
|
||||||
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
|
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
|
||||||
if len(d.Addresses) != 0 {
|
if len(d.PeerInfo.Addrs) != 0 {
|
||||||
go func(ma multiaddr.Multiaddr) {
|
go func(ma multiaddr.Multiaddr) {
|
||||||
w.identifyAndConnect(ctx, w.settings.LightClient, ma)
|
w.identifyAndConnect(ctx, w.settings.LightClient, ma)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(d.Addresses[0])
|
}(d.PeerInfo.Addrs[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -564,14 +564,14 @@ func (w *Waku) runPeerExchangeLoop() {
|
||||||
var withThesePeers []peer.ID
|
var withThesePeers []peer.ID
|
||||||
for _, record := range w.dnsAddressCache {
|
for _, record := range w.dnsAddressCache {
|
||||||
for _, discoveredNode := range record {
|
for _, discoveredNode := range record {
|
||||||
if len(discoveredNode.Addresses) == 0 {
|
if len(discoveredNode.PeerInfo.Addrs) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Obtaining peer ID
|
// Obtaining peer ID
|
||||||
peerIDString, err := discoveredNode.Addresses[0].ValueForProtocol(multiaddr.P_P2P)
|
peerIDString, err := discoveredNode.PeerInfo.Addrs[0].ValueForProtocol(multiaddr.P_P2P)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.logger.Warn("multiaddress does not contain peerID", zap.String("multiaddr", discoveredNode.Addresses[0].String()))
|
w.logger.Warn("multiaddress does not contain peerID", zap.String("multiaddr", discoveredNode.PeerInfo.Addrs[0].String()))
|
||||||
continue // No peer ID available somehow
|
continue // No peer ID available somehow
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue