fix: go-waku update discv5 ENR on succesful NAT

This commit is contained in:
Richard Ramos 2022-11-22 18:05:54 -04:00 committed by RichΛrd
parent eaced1c1e9
commit ea89a41d96
85 changed files with 1801 additions and 151 deletions

View File

@ -6,8 +6,8 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/eth-node/types"

2
go.mod
View File

@ -51,7 +51,6 @@ require (
github.com/prometheus/client_golang v1.13.0
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
github.com/status-im/doubleratchet v3.0.0+incompatible
github.com/status-im/go-waku v0.2.3-0.20221107152343-98c3ca8dc60f
github.com/status-im/markdown v0.0.0-20220622180305-7ee4aa8bbc3f
github.com/status-im/migrate/v4 v4.6.2-status.2
github.com/status-im/rendezvous v1.3.6
@ -81,6 +80,7 @@ require github.com/fogleman/gg v1.3.0
require (
github.com/gorilla/sessions v1.2.1
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8
github.com/waku-org/go-waku v0.2.3-0.20221122234656-5d7d05ca16a5
)
require (

4
go.sum
View File

@ -1958,8 +1958,6 @@ github.com/status-im/go-ethereum v1.10.25-status.3 h1:otbW2SCLmNNXAT8uqtrhxq0jRl
github.com/status-im/go-ethereum v1.10.25-status.3/go.mod h1:Dt4K5JYMhJRdtXJwBEyGZLZn9iz/chSOZyjVmt5ZhwQ=
github.com/status-im/go-multiaddr-ethv4 v1.2.4 h1:7fw0Y48TJXEqx4fOHlDOUiM/uBq9zG5w4x975Mjh4E0=
github.com/status-im/go-multiaddr-ethv4 v1.2.4/go.mod h1:PDh4D7h5CvecPIy0ji0rLNwTnzzEcyz9uTPHD42VyH4=
github.com/status-im/go-waku v0.2.3-0.20221107152343-98c3ca8dc60f h1:VrZZBXVciz38WdhKUbuE4gK9MJ1iWZ6tH8mylkEYGYE=
github.com/status-im/go-waku v0.2.3-0.20221107152343-98c3ca8dc60f/go.mod h1:jILOqhyOtm3srE79ob53Ifi+dY+KRpzG5FuD2ZQ2IzU=
github.com/status-im/go-watchdog v1.2.0-ios-nolibproc h1:BJwZEF7OVKaXc2zErBUAolFSGzwrTBbWnN8e/6MER5E=
github.com/status-im/go-watchdog v1.2.0-ios-nolibproc/go.mod h1:lzSbAl5sh4rtI8tYHU01BWIDzgzqaQLj6RcA1i4mlqI=
github.com/status-im/gomoji v1.1.3-0.20220213022530-e5ac4a8732d4 h1:CtobZoiNdHpx+xurFxnuJ1xsGm3oKMfcZkB3vmomJmA=
@ -2067,6 +2065,8 @@ github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/waku-org/go-discover v0.0.0-20221027130446-2f43d5f6c73f h1:YHIrSqs8Aot1exhwx0+uwdshCp3RfZu5OY6Hvt3Hk8g=
github.com/waku-org/go-discover v0.0.0-20221027130446-2f43d5f6c73f/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-waku v0.2.3-0.20221122234656-5d7d05ca16a5 h1:fsDf8CcQZOIwgJ9OhEdX7FkYXP+JYlcYcOMWlY/id7w=
github.com/waku-org/go-waku v0.2.3-0.20221122234656-5d7d05ca16a5/go.mod h1:QqlddKC5Q8ucWSnKQrHClWAkm6HszsQ1bUOXZYvt2UA=
github.com/waku-org/go-zerokit-rln v0.1.6 h1:r8B6S83WJIioxEj1wSruhx+eg47HpSrIwuhi6yaRvy0=
github.com/waku-org/go-zerokit-rln v0.1.6/go.mod h1:T1wLR/VuTcxLkDv0O7JvR0N/9y7GHM2IeU7LjnWZxek=
github.com/waku-org/noise v1.0.2 h1:7WmlhpJ0eliBzwzKz6SoTqQznaEU2IuebHF3oCekqqs=

View File

@ -15,7 +15,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@ -24,8 +24,8 @@ import (
type byteArr [][]byte
// HexArray creates a field with an array of bytes that will be shown as a hexadecimal string in logs
func HexArray(key string, byteVal ...[]byte) zapcore.Field {
return zap.Array(key, byteArr(byteVal))
func HexArray(key string, byteVal byteArr) zapcore.Field {
return zap.Array(key, byteVal)
}
func (bArr byteArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
@ -37,7 +37,7 @@ func (bArr byteArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
type hexByte []byte
func HexString(key string, byteVal []byte) zapcore.Field {
func HexString(key string, byteVal hexByte) zapcore.Field {
return zap.Stringer(key, hexByte(byteVal))
}

View File

@ -8,10 +8,10 @@ import (
"sync"
"time"
"github.com/status-im/go-waku/waku/persistence/migrations"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/persistence/migrations"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

View File

@ -1,7 +1,7 @@
package v2
import (
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120

View File

@ -14,14 +14,14 @@ import (
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-discover/discover"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
type DiscoveryV5 struct {
sync.Mutex
sync.RWMutex
discovery.Discovery
@ -33,6 +33,7 @@ type DiscoveryV5 struct {
localnode *enode.LocalNode
NAT nat.Interface
quit chan struct{}
started bool
log *zap.Logger
@ -43,13 +44,14 @@ type DiscoveryV5 struct {
type peerCache struct {
sync.RWMutex
recs map[peer.ID]peerRecord
recs map[peer.ID]PeerRecord
rng *rand.Rand
}
type peerRecord struct {
type PeerRecord struct {
expire int64
peer peer.AddrInfo
Peer peer.AddrInfo
Node enode.Node
}
type discV5Parameters struct {
@ -115,7 +117,7 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
wg: &sync.WaitGroup{},
peerCache: peerCache{
rng: rand.New(rand.NewSource(rand.Int63())),
recs: make(map[peer.ID]peerRecord),
recs: make(map[peer.ID]PeerRecord),
},
localnode: localnode,
config: discover.Config{
@ -137,6 +139,10 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc
}, nil
}
func (d *DiscoveryV5) Node() *enode.Node {
return d.localnode.Node()
}
func (d *DiscoveryV5) listen() error {
conn, err := net.ListenUDP("udp", d.udpAddr)
if err != nil {
@ -174,9 +180,14 @@ func (d *DiscoveryV5) Start() error {
d.Lock()
defer d.Unlock()
if d.started {
return nil
}
d.wg.Wait() // Waiting for other go routines to stop
d.quit = make(chan struct{}, 1)
d.started = true
err := d.listen()
if err != nil {
@ -190,10 +201,15 @@ func (d *DiscoveryV5) Stop() {
d.Lock()
defer d.Unlock()
if !d.started {
return
}
close(d.quit)
d.listener.Close()
d.listener = nil
d.started = false
d.log.Info("stopped Discovery V5")
@ -293,9 +309,10 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi
}
for _, p := range peerAddrs {
d.peerCache.recs[p.ID] = peerRecord{
d.peerCache.recs[p.ID] = PeerRecord{
expire: time.Now().Unix() + 3600, // Expires in 1hr
peer: p,
Peer: p,
Node: *iterator.Node(),
}
}
@ -320,7 +337,7 @@ func (d *DiscoveryV5) removeExpiredPeers() int {
return newCacheSize
}
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
func (d *DiscoveryV5) FindNodes(ctx context.Context, topic string, opts ...discovery.Option) ([]PeerRecord, error) {
// Get options
var options discovery.Options
err := options.Apply(opts...)
@ -328,7 +345,7 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
return nil, err
}
const maxLimit = 100
const maxLimit = 600
limit := options.Limit
if limit == 0 || limit > maxLimit {
limit = maxLimit
@ -368,29 +385,43 @@ func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...disco
count = limit
}
chPeer := make(chan peer.AddrInfo, count)
perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count]
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
sendLst := make([]*peer.AddrInfo, count)
sendLst := make([]PeerRecord, count)
iter := 0
for k := range d.peerCache.recs {
if sendIndex, ok := permSet[iter]; ok {
peerInfo := d.peerCache.recs[k].peer
sendLst[sendIndex] = &peerInfo
sendLst[sendIndex] = d.peerCache.recs[k]
}
iter++
}
for _, send := range sendLst {
chPeer <- *send
return sendLst, err
}
func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
records, err := d.FindNodes(ctx, topic, opts...)
if err != nil {
return nil, err
}
chPeer := make(chan peer.AddrInfo, len(records))
for _, r := range records {
chPeer <- r.Peer
}
close(chPeer)
return chPeer, err
}
func (d *DiscoveryV5) IsStarted() bool {
d.RLock()
defer d.RUnlock()
return d.started
}

View File

@ -6,7 +6,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/utils"
ma "github.com/multiformats/go-multiaddr"
)

View File

@ -3,7 +3,7 @@ package metrics
import (
"context"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
@ -18,6 +18,7 @@ var (
FilterSubscriptions = stats.Int64("filter_subscriptions", "Number of filter subscriptions", stats.UnitDimensionless)
StoreErrors = stats.Int64("errors", "Number of errors in store protocol", stats.UnitDimensionless)
LightpushErrors = stats.Int64("errors", "Number of errors in lightpush protocol", stats.UnitDimensionless)
PeerExchangeError = stats.Int64("errors", "Number of errors in peer exchange protocol", stats.UnitDimensionless)
)
var (
@ -79,6 +80,12 @@ func RecordLightpushError(ctx context.Context, tagType string) {
}
}
func RecordPeerExchangeError(ctx context.Context, tagType string) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, PeerExchangeError.M(1)); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))
}
}
func RecordMessage(ctx context.Context, tagType string, len int) {
if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil {
utils.Logger().Error("failed to record with tags", zap.Error(err))

View File

@ -7,12 +7,12 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.opencensus.io/stats"
"go.uber.org/zap"
)

View File

@ -7,7 +7,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/status-im/go-waku/logging"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
)

View File

@ -12,8 +12,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -23,6 +23,16 @@ func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, i
return nil, err
}
localnode := enode.NewLocalNode(db, priv)
err = w.updateLocalNode(localnode, priv, wsAddr, ipAddr, udpPort, wakuFlags, advertiseAddr, log)
if err != nil {
return nil, err
}
return localnode, nil
}
func (w *WakuNode) updateLocalNode(localnode *enode.LocalNode, priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) error {
localnode.SetFallbackUDP(udpPort)
localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags))
localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
@ -48,12 +58,12 @@ func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, i
for _, addr := range wsAddr {
p2p, err := addr.ValueForProtocol(ma.P_P2P)
if err != nil {
return nil, err
return err
}
p2pAddr, err := ma.NewMultiaddr("/p2p/" + p2p)
if err != nil {
return nil, fmt.Errorf("could not create p2p addr: %w", err)
return fmt.Errorf("could not create p2p addr: %w", err)
}
maRaw := addr.Decapsulate(p2pAddr).Bytes()
@ -68,7 +78,7 @@ func (w *WakuNode) newLocalnode(priv *ecdsa.PrivateKey, wsAddr []ma.Multiaddr, i
localnode.Set(enr.WithEntry(utils.MultiaddrENRField, fieldRaw))
}
return localnode, nil
return nil
}
func isPrivate(addr candidateAddr) bool {
@ -220,12 +230,30 @@ func (w *WakuNode) setupENR(addrs []ma.Multiaddr) error {
// TODO: make this optional depending on DNS Disc being enabled
if w.opts.privKey != nil {
if w.localNode != nil {
err := w.updateLocalNode(w.localNode, w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
// Restarting DiscV5
if w.discoveryV5 != nil && w.discoveryV5.IsStarted() {
w.log.Info("restarting discv5")
w.discoveryV5.Stop()
err = w.discoveryV5.Start()
if err != nil {
w.log.Error("could not restart discv5", zap.Error(err))
return err
}
}
}
} else {
localNode, err := w.newLocalnode(w.opts.privKey, wsAddresses, ipAddr, w.opts.udpPort, w.wakuFlag, w.opts.advertiseAddr, w.log)
if err != nil {
w.log.Error("obtaining ENR record from multiaddress", logging.MultiAddrs("multiaddr", extAddr), zap.Error(err))
return err
} else {
if w.localNode == nil || w.localNode.Node().String() != localNode.Node().String() {
w.localNode = localNode
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
}

View File

@ -0,0 +1,113 @@
//go:build gowaku_rln
// +build gowaku_rln
package node
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
const RLN_CREDENTIALS_FILENAME = "rlnCredentials.txt"
func WriteRLNMembershipCredentialsToFile(keyPair *rln.MembershipKeyPair, idx rln.MembershipIndex, contractAddress common.Address, path string, passwd []byte) error {
if path == "" {
return nil // we dont want to use a credentials file
}
if keyPair == nil {
return nil // no credentials to store
}
credentialsJSON, err := json.Marshal(MembershipCredentials{
Keypair: keyPair,
Index: idx,
Contract: contractAddress,
})
if err != nil {
return err
}
encryptedCredentials, err := keystore.EncryptDataV3(credentialsJSON, passwd, keystore.StandardScryptN, keystore.StandardScryptP)
if err != nil {
return err
}
output, err := json.Marshal(encryptedCredentials)
if err != nil {
return err
}
path = filepath.Join(path, RLN_CREDENTIALS_FILENAME)
return ioutil.WriteFile(path, output, 0600)
}
func loadMembershipCredentialsFromFile(credentialsFilePath string, passwd string) (MembershipCredentials, error) {
src, err := ioutil.ReadFile(credentialsFilePath)
if err != nil {
return MembershipCredentials{}, err
}
var encryptedK keystore.CryptoJSON
err = json.Unmarshal(src, &encryptedK)
if err != nil {
return MembershipCredentials{}, err
}
credentialsBytes, err := keystore.DecryptDataV3(encryptedK, passwd)
if err != nil {
return MembershipCredentials{}, err
}
var credentials MembershipCredentials
err = json.Unmarshal(credentialsBytes, &credentials)
return credentials, err
}
func GetMembershipCredentials(logger *zap.Logger, credentialsPath string, password string, membershipContract common.Address, membershipIndex uint) (credentials MembershipCredentials, err error) {
if credentialsPath == "" { // Not using a file
return MembershipCredentials{
Contract: membershipContract,
}, nil
}
credentialsFilePath := filepath.Join(credentialsPath, RLN_CREDENTIALS_FILENAME)
if _, err = os.Stat(credentialsFilePath); err == nil {
if credentials, err := loadMembershipCredentialsFromFile(credentialsFilePath, password); err != nil {
return MembershipCredentials{}, fmt.Errorf("could not read membership credentials file: %w", err)
} else {
logger.Info("loaded rln credentials", zap.String("filepath", credentialsFilePath))
if (bytes.Equal(credentials.Contract.Bytes(), common.Address{}.Bytes())) {
credentials.Contract = membershipContract
}
if (bytes.Equal(membershipContract.Bytes(), common.Address{}.Bytes())) {
return MembershipCredentials{}, errors.New("no contract address specified")
}
return credentials, nil
}
}
if os.IsNotExist(err) {
return MembershipCredentials{
Keypair: nil,
Index: membershipIndex,
Contract: membershipContract,
}, nil
}
return MembershipCredentials{}, fmt.Errorf("could not read membership credentials file: %w", err)
}

View File

@ -14,8 +14,8 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/ecies"
"github.com/status-im/go-waku/waku/v2/noise"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/noise"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
// KeyKind indicates the type of encryption to apply

View File

@ -24,19 +24,20 @@ import (
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/try"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/discv5"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/status-im/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/try"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type Peer struct {
@ -79,6 +80,7 @@ type WakuNode struct {
addrChan chan ma.Multiaddr
discoveryV5 *discv5.DiscoveryV5
peerExchange *peer_exchange.WakuPeerExchange
bcaster v2.Broadcaster
@ -291,6 +293,13 @@ func (w *WakuNode) Start() error {
}
}
if w.opts.enablePeerExchange {
err := w.mountPeerExchange()
if err != nil {
return err
}
}
if w.opts.enableDiscV5 {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...))
}
@ -345,6 +354,14 @@ func (w *WakuNode) Stop() {
w.filter.Stop()
}
if w.peerExchange != nil {
w.peerExchange.Stop()
}
if w.discoveryV5 != nil {
w.discoveryV5.Stop()
}
w.relay.Stop()
w.lightPush.Stop()
w.store.Stop()
@ -405,6 +422,11 @@ func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 {
return w.discoveryV5
}
// PeerExchange is used to access any operation related to Peer Exchange
func (w *WakuNode) PeerExchange() *peer_exchange.WakuPeerExchange {
return w.peerExchange
}
// Broadcaster is used to access the message broadcaster that is used to push
// messages to different protocols
func (w *WakuNode) Broadcaster() v2.Broadcaster {
@ -474,6 +496,11 @@ func (w *WakuNode) mountDiscV5() error {
return err
}
func (w *WakuNode) mountPeerExchange() error {
w.peerExchange = peer_exchange.NewWakuPeerExchange(w.ctx, w.host, w.discoveryV5, w.log)
return w.peerExchange.Start()
}
func (w *WakuNode) startStore() {
w.store.Start(w.ctx)

View File

@ -8,7 +8,7 @@ import (
"encoding/hex"
"errors"
"github.com/status-im/go-waku/waku/v2/protocol/rln"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)

View File

@ -23,10 +23,10 @@ import (
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -77,6 +77,8 @@ type WakuNodeParameters struct {
discV5Opts []pubsub.DiscoverOpt
discV5autoUpdate bool
enablePeerExchange bool
enableRLN bool
rlnRelayMemIndex uint
rlnRelayPubsubTopic string
@ -277,6 +279,14 @@ func WithDiscoveryV5(udpPort int, bootnodes []*enode.Node, autoUpdate bool, disc
}
}
// WithPeerExchange is a WakuOption used to enable Peer Exchange
func WithPeerExchange() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enablePeerExchange = true
return nil
}
}
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
@ -394,6 +404,7 @@ func WithSecureWebsockets(address string, port int, certPath string, keyPath str
}
params.tlsConfig = &tls.Config{
Certificates: []tls.Certificate{certificate},
MinVersion: tls.VersionTLS12,
}
return nil

View File

@ -7,7 +7,7 @@ import (
"crypto/ecdsa"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/go-waku/waku/v2/protocol/rln"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
r "github.com/waku-org/go-zerokit-rln/rln"
)

View File

@ -3,7 +3,7 @@ package protocol
import (
"crypto/sha256"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
// Envelope contains information about the pubsub topic of a WakuMessage

View File

@ -3,9 +3,9 @@ package filter
import (
"sync"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type FilterMap struct {

View File

@ -5,7 +5,7 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
type Subscriber struct {

View File

@ -12,10 +12,10 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"

View File

@ -6,7 +6,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

View File

@ -10,11 +10,11 @@ import (
"github.com/libp2p/go-libp2p/core/network"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)

View File

@ -5,8 +5,8 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

View File

@ -5,3 +5,4 @@ package pb
//go:generate protoc -I. --gofast_out=. ./waku_message.proto
//go:generate protoc -I. --gofast_out=. ./waku_store.proto
//go:generate protoc -I. --gofast_out=. ./waku_swap.proto
//go:generate protoc -I. --gofast_out=. ./waku_peer_exchange.proto

View File

@ -0,0 +1,938 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: waku_peer_exchange.proto
package pb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type PeerInfo struct {
ENR []byte `protobuf:"bytes,1,opt,name=ENR,proto3" json:"ENR,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PeerInfo) Reset() { *m = PeerInfo{} }
func (m *PeerInfo) String() string { return proto.CompactTextString(m) }
func (*PeerInfo) ProtoMessage() {}
func (*PeerInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_ce50192ba54b780f, []int{0}
}
func (m *PeerInfo) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PeerInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PeerInfo.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PeerInfo) XXX_Merge(src proto.Message) {
xxx_messageInfo_PeerInfo.Merge(m, src)
}
func (m *PeerInfo) XXX_Size() int {
return m.Size()
}
func (m *PeerInfo) XXX_DiscardUnknown() {
xxx_messageInfo_PeerInfo.DiscardUnknown(m)
}
var xxx_messageInfo_PeerInfo proto.InternalMessageInfo
func (m *PeerInfo) GetENR() []byte {
if m != nil {
return m.ENR
}
return nil
}
type PeerExchangeQuery struct {
NumPeers uint64 `protobuf:"varint,1,opt,name=numPeers,proto3" json:"numPeers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PeerExchangeQuery) Reset() { *m = PeerExchangeQuery{} }
func (m *PeerExchangeQuery) String() string { return proto.CompactTextString(m) }
func (*PeerExchangeQuery) ProtoMessage() {}
func (*PeerExchangeQuery) Descriptor() ([]byte, []int) {
return fileDescriptor_ce50192ba54b780f, []int{1}
}
func (m *PeerExchangeQuery) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PeerExchangeQuery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PeerExchangeQuery.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PeerExchangeQuery) XXX_Merge(src proto.Message) {
xxx_messageInfo_PeerExchangeQuery.Merge(m, src)
}
func (m *PeerExchangeQuery) XXX_Size() int {
return m.Size()
}
func (m *PeerExchangeQuery) XXX_DiscardUnknown() {
xxx_messageInfo_PeerExchangeQuery.DiscardUnknown(m)
}
var xxx_messageInfo_PeerExchangeQuery proto.InternalMessageInfo
func (m *PeerExchangeQuery) GetNumPeers() uint64 {
if m != nil {
return m.NumPeers
}
return 0
}
type PeerExchangeResponse struct {
PeerInfos []*PeerInfo `protobuf:"bytes,1,rep,name=peerInfos,proto3" json:"peerInfos,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PeerExchangeResponse) Reset() { *m = PeerExchangeResponse{} }
func (m *PeerExchangeResponse) String() string { return proto.CompactTextString(m) }
func (*PeerExchangeResponse) ProtoMessage() {}
func (*PeerExchangeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_ce50192ba54b780f, []int{2}
}
func (m *PeerExchangeResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PeerExchangeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PeerExchangeResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PeerExchangeResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_PeerExchangeResponse.Merge(m, src)
}
func (m *PeerExchangeResponse) XXX_Size() int {
return m.Size()
}
func (m *PeerExchangeResponse) XXX_DiscardUnknown() {
xxx_messageInfo_PeerExchangeResponse.DiscardUnknown(m)
}
var xxx_messageInfo_PeerExchangeResponse proto.InternalMessageInfo
func (m *PeerExchangeResponse) GetPeerInfos() []*PeerInfo {
if m != nil {
return m.PeerInfos
}
return nil
}
type PeerExchangeRPC struct {
Query *PeerExchangeQuery `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"`
Response *PeerExchangeResponse `protobuf:"bytes,2,opt,name=response,proto3" json:"response,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PeerExchangeRPC) Reset() { *m = PeerExchangeRPC{} }
func (m *PeerExchangeRPC) String() string { return proto.CompactTextString(m) }
func (*PeerExchangeRPC) ProtoMessage() {}
func (*PeerExchangeRPC) Descriptor() ([]byte, []int) {
return fileDescriptor_ce50192ba54b780f, []int{3}
}
func (m *PeerExchangeRPC) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PeerExchangeRPC) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PeerExchangeRPC.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PeerExchangeRPC) XXX_Merge(src proto.Message) {
xxx_messageInfo_PeerExchangeRPC.Merge(m, src)
}
func (m *PeerExchangeRPC) XXX_Size() int {
return m.Size()
}
func (m *PeerExchangeRPC) XXX_DiscardUnknown() {
xxx_messageInfo_PeerExchangeRPC.DiscardUnknown(m)
}
var xxx_messageInfo_PeerExchangeRPC proto.InternalMessageInfo
func (m *PeerExchangeRPC) GetQuery() *PeerExchangeQuery {
if m != nil {
return m.Query
}
return nil
}
func (m *PeerExchangeRPC) GetResponse() *PeerExchangeResponse {
if m != nil {
return m.Response
}
return nil
}
func init() {
proto.RegisterType((*PeerInfo)(nil), "PeerInfo")
proto.RegisterType((*PeerExchangeQuery)(nil), "PeerExchangeQuery")
proto.RegisterType((*PeerExchangeResponse)(nil), "PeerExchangeResponse")
proto.RegisterType((*PeerExchangeRPC)(nil), "PeerExchangeRPC")
}
func init() { proto.RegisterFile("waku_peer_exchange.proto", fileDescriptor_ce50192ba54b780f) }
var fileDescriptor_ce50192ba54b780f = []byte{
// 211 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x28, 0x4f, 0xcc, 0x2e,
0x8d, 0x2f, 0x48, 0x4d, 0x2d, 0x8a, 0x4f, 0xad, 0x48, 0xce, 0x48, 0xcc, 0x4b, 0x4f, 0xd5, 0x2b,
0x28, 0xca, 0x2f, 0xc9, 0x57, 0x92, 0xe1, 0xe2, 0x08, 0x48, 0x4d, 0x2d, 0xf2, 0xcc, 0x4b, 0xcb,
0x17, 0x12, 0xe0, 0x62, 0x76, 0xf5, 0x0b, 0x92, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x09, 0x02, 0x31,
0x95, 0xf4, 0xb9, 0x04, 0x41, 0xb2, 0xae, 0x50, 0x3d, 0x81, 0xa5, 0xa9, 0x45, 0x95, 0x42, 0x52,
0x5c, 0x1c, 0x79, 0xa5, 0xb9, 0x20, 0xf1, 0x62, 0xb0, 0x5a, 0x96, 0x20, 0x38, 0x5f, 0xc9, 0x9e,
0x4b, 0x04, 0x59, 0x43, 0x50, 0x6a, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x90, 0x3a, 0x17, 0x67,
0x01, 0xd4, 0x1a, 0x90, 0x26, 0x66, 0x0d, 0x6e, 0x23, 0x4e, 0x3d, 0x98, 0xc5, 0x41, 0x08, 0x39,
0xa5, 0x3c, 0x2e, 0x7e, 0x14, 0x03, 0x02, 0x9c, 0x85, 0x34, 0xb8, 0x58, 0x0b, 0x41, 0x16, 0x83,
0x2d, 0xe3, 0x36, 0x12, 0xd2, 0xc3, 0x70, 0x52, 0x10, 0x44, 0x81, 0x90, 0x21, 0x17, 0x47, 0x11,
0xd4, 0x46, 0x09, 0x26, 0xb0, 0x62, 0x51, 0x3d, 0x6c, 0xce, 0x09, 0x82, 0x2b, 0x73, 0x12, 0x38,
0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x67, 0x3c, 0x96, 0x63,
0x48, 0x62, 0x03, 0x07, 0x8c, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x99, 0xd5, 0xbb, 0xb6, 0x34,
0x01, 0x00, 0x00,
}
func (m *PeerInfo) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PeerInfo) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PeerInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.ENR) > 0 {
i -= len(m.ENR)
copy(dAtA[i:], m.ENR)
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(len(m.ENR)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *PeerExchangeQuery) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PeerExchangeQuery) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PeerExchangeQuery) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.NumPeers != 0 {
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(m.NumPeers))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *PeerExchangeResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PeerExchangeResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PeerExchangeResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.PeerInfos) > 0 {
for iNdEx := len(m.PeerInfos) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.PeerInfos[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *PeerExchangeRPC) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PeerExchangeRPC) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *PeerExchangeRPC) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Response != nil {
{
size, err := m.Response.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
if m.Query != nil {
{
size, err := m.Query.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintWakuPeerExchange(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintWakuPeerExchange(dAtA []byte, offset int, v uint64) int {
offset -= sovWakuPeerExchange(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *PeerInfo) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.ENR)
if l > 0 {
n += 1 + l + sovWakuPeerExchange(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *PeerExchangeQuery) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.NumPeers != 0 {
n += 1 + sovWakuPeerExchange(uint64(m.NumPeers))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *PeerExchangeResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.PeerInfos) > 0 {
for _, e := range m.PeerInfos {
l = e.Size()
n += 1 + l + sovWakuPeerExchange(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *PeerExchangeRPC) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Query != nil {
l = m.Query.Size()
n += 1 + l + sovWakuPeerExchange(uint64(l))
}
if m.Response != nil {
l = m.Response.Size()
n += 1 + l + sovWakuPeerExchange(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovWakuPeerExchange(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozWakuPeerExchange(x uint64) (n int) {
return sovWakuPeerExchange(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *PeerInfo) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PeerInfo: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PeerInfo: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ENR", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthWakuPeerExchange
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ENR = append(m.ENR[:0], dAtA[iNdEx:postIndex]...)
if m.ENR == nil {
m.ENR = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PeerExchangeQuery) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PeerExchangeQuery: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PeerExchangeQuery: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NumPeers", wireType)
}
m.NumPeers = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.NumPeers |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PeerExchangeResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PeerExchangeResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PeerExchangeResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field PeerInfos", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthWakuPeerExchange
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.PeerInfos = append(m.PeerInfos, &PeerInfo{})
if err := m.PeerInfos[len(m.PeerInfos)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PeerExchangeRPC) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PeerExchangeRPC: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PeerExchangeRPC: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthWakuPeerExchange
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Query == nil {
m.Query = &PeerExchangeQuery{}
}
if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Response", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthWakuPeerExchange
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Response == nil {
m.Response = &PeerExchangeResponse{}
}
if err := m.Response.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipWakuPeerExchange(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthWakuPeerExchange
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipWakuPeerExchange(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowWakuPeerExchange
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthWakuPeerExchange
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupWakuPeerExchange
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthWakuPeerExchange
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthWakuPeerExchange = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowWakuPeerExchange = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupWakuPeerExchange = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -0,0 +1,18 @@
syntax = "proto3";
message PeerInfo {
bytes ENR = 1;
}
message PeerExchangeQuery {
uint64 numPeers = 1; // number of peers requested
}
message PeerExchangeResponse {
repeated PeerInfo peerInfos = 1;
}
message PeerExchangeRPC {
PeerExchangeQuery query = 1;
PeerExchangeResponse response = 2;
}

View File

@ -0,0 +1,364 @@
package peer_exchange
import (
"bufio"
"bytes"
"context"
"errors"
"math"
"math/rand"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1")
const MaxCacheSize = 1000
const CacheCleanWindow = 200
const dialTimeout = 7 * time.Second
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
ErrInvalidId = errors.New("invalid request id")
)
type peerRecord struct {
node enode.Node
idx int
}
type WakuPeerExchange struct {
h host.Host
ctx context.Context
disc *discv5.DiscoveryV5
log *zap.Logger
quit chan struct{}
wg sync.WaitGroup
enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
enrCacheMutex sync.RWMutex
rng *rand.Rand
started bool
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(ctx context.Context, h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange {
wakuPX := new(WakuPeerExchange)
wakuPX.ctx = ctx
wakuPX.h = h
wakuPX.disc = disc
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = make(map[enode.ID]peerRecord)
wakuPX.rng = rand.New(rand.NewSource(rand.Int63()))
return wakuPX
}
// Start inits the peer exchange protocol
func (wakuPX *WakuPeerExchange) Start() error {
wakuPX.h.SetStreamHandlerMatch(PeerExchangeID_v20alpha1, protocol.PrefixTextMatch(string(PeerExchangeID_v20alpha1)), wakuPX.onRequest)
wakuPX.log.Info("Peer exchange protocol started")
wakuPX.started = true
wakuPX.quit = make(chan struct{}, 1)
wakuPX.wg.Add(1)
go wakuPX.runPeerExchangeDiscv5Loop()
return nil
}
func (wakuPX *WakuPeerExchange) handleResponse(response *pb.PeerExchangeResponse) error {
var peers []peer.AddrInfo
for _, p := range response.PeerInfos {
enrRecord := &enr.Record{}
buf := bytes.NewBuffer(p.ENR)
err := enrRecord.DecodeRLP(rlp.NewStream(buf, uint64(len(p.ENR))))
if err != nil {
wakuPX.log.Error("converting bytes to enr", zap.Error(err))
return err
}
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
if err != nil {
wakuPX.log.Error("creating enode record", zap.Error(err))
return err
}
peerInfo, err := utils.EnodeToPeerInfo(enodeRecord)
if err != nil {
return err
}
if wakuPX.h.Network().Connectedness(peerInfo.ID) != network.Connected {
peers = append(peers, *peerInfo)
}
}
if len(peers) != 0 {
log.Info("connecting to newly discovered peers", zap.Int("count", len(peers)))
for _, p := range peers {
func(p peer.AddrInfo) {
ctx, cancel := context.WithTimeout(wakuPX.ctx, dialTimeout)
defer cancel()
err := wakuPX.h.Connect(ctx, p)
if err != nil {
log.Info("connecting to peer", zap.String("peer", p.ID.Pretty()), zap.Error(err))
}
}(p)
}
}
return nil
}
func (wakuPX *WakuPeerExchange) onRequest(s network.Stream) {
defer s.Close()
logger := wakuPX.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
requestRPC := &pb.PeerExchangeRPC{}
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
err := reader.ReadMsg(requestRPC)
if err != nil {
logger.Error("reading request", zap.Error(err))
metrics.RecordPeerExchangeError(wakuPX.ctx, "decodeRpcFailure")
return
}
if requestRPC.Query != nil {
logger.Info("request received")
err := wakuPX.respond(requestRPC.Query.NumPeers, s.Conn().RemotePeer())
if err != nil {
logger.Error("responding", zap.Error(err))
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
return
}
}
if requestRPC.Response != nil {
logger.Info("response received")
err := wakuPX.handleResponse(requestRPC.Response)
if err != nil {
logger.Error("handling response", zap.Error(err))
metrics.RecordPeerExchangeError(wakuPX.ctx, "pxFailure")
return
}
}
}
func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts ...PeerExchangeOption) error {
params := new(PeerExchangeParameters)
params.host = wakuPX.h
params.log = wakuPX.log
optList := DefaultOptions(wakuPX.h)
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
metrics.RecordPeerExchangeError(wakuPX.ctx, "dialError")
return ErrNoPeersAvailable
}
requestRPC := &pb.PeerExchangeRPC{
Query: &pb.PeerExchangeQuery{
NumPeers: uint64(numPeers),
},
}
return wakuPX.sendPeerExchangeRPCToPeer(requestRPC, params.selectedPeer)
}
// IsStarted returns if the peer exchange protocol has been mounted or not
func (wakuPX *WakuPeerExchange) IsStarted() bool {
return wakuPX.started
}
// Stop unmounts the peer exchange protocol
func (wakuPX *WakuPeerExchange) Stop() {
if wakuPX.started {
wakuPX.h.RemoveStreamHandler(PeerExchangeID_v20alpha1)
wakuPX.started = false
close(wakuPX.quit)
wakuPX.wg.Wait()
}
}
func (wakuPX *WakuPeerExchange) sendPeerExchangeRPCToPeer(rpc *pb.PeerExchangeRPC, peerID peer.ID) error {
logger := wakuPX.log.With(logging.HostID("peer", peerID))
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wakuPX.h.Connect(wakuPX.ctx, wakuPX.h.Peerstore().PeerInfo(peerID))
if err != nil {
logger.Error("connecting peer", zap.Error(err))
return err
}
connOpt, err := wakuPX.h.NewStream(wakuPX.ctx, peerID, PeerExchangeID_v20alpha1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
return err
}
defer connOpt.Close()
writer := protoio.NewDelimitedWriter(connOpt)
err = writer.WriteMsg(rpc)
if err != nil {
logger.Error("writing response", zap.Error(err))
return err
}
return nil
}
func (wakuPX *WakuPeerExchange) respond(numPeers uint64, peerID peer.ID) error {
records, err := wakuPX.getENRsFromCache(numPeers)
if err != nil {
return err
}
responseRPC := &pb.PeerExchangeRPC{}
responseRPC.Response = new(pb.PeerExchangeResponse)
responseRPC.Response.PeerInfos = records
return wakuPX.sendPeerExchangeRPCToPeer(responseRPC, peerID)
}
func (wakuPX *WakuPeerExchange) getENRsFromCache(numPeers uint64) ([]*pb.PeerInfo, error) {
wakuPX.enrCacheMutex.Lock()
defer wakuPX.enrCacheMutex.Unlock()
if len(wakuPX.enrCache) == 0 {
return nil, nil
}
numItems := int(numPeers)
if len(wakuPX.enrCache) < int(numPeers) {
numItems = len(wakuPX.enrCache)
}
perm := wakuPX.rng.Perm(len(wakuPX.enrCache))[0:numItems]
permSet := make(map[int]int)
for i, v := range perm {
permSet[v] = i
}
var result []*pb.PeerInfo
iter := 0
for k := range wakuPX.enrCache {
if _, ok := permSet[iter]; ok {
var b bytes.Buffer
writer := bufio.NewWriter(&b)
enode := wakuPX.enrCache[k]
err := enode.node.Record().EncodeRLP(writer)
if err != nil {
return nil, err
}
writer.Flush()
result = append(result, &pb.PeerInfo{
ENR: b.Bytes(),
})
}
iter++
}
return result, nil
}
func (wakuPX *WakuPeerExchange) cleanCache() {
if len(wakuPX.enrCache) < MaxCacheSize {
return
}
r := make(map[enode.ID]peerRecord)
for k, v := range wakuPX.enrCache {
if v.idx > CacheCleanWindow {
v.idx -= CacheCleanWindow
r[k] = v
}
}
wakuPX.enrCache = r
}
func (wakuPX *WakuPeerExchange) findPeers() {
if !wakuPX.disc.IsStarted() {
return
}
ctx, cancel := context.WithTimeout(wakuPX.ctx, 2*time.Second)
defer cancel()
peerRecords, err := wakuPX.disc.FindNodes(ctx, "")
if err != nil {
wakuPX.log.Error("finding peers", zap.Error(err))
}
cnt := 0
wakuPX.enrCacheMutex.Lock()
for _, p := range peerRecords {
cnt++
wakuPX.enrCache[p.Node.ID()] = peerRecord{
idx: len(wakuPX.enrCache),
node: p.Node,
}
}
wakuPX.enrCacheMutex.Unlock()
wakuPX.log.Info("discovered px peers via discv5", zap.Int("count", cnt))
wakuPX.cleanCache()
}
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop() {
defer wakuPX.wg.Done()
// Runs a discv5 loop adding new peers to the px peer cache
if wakuPX.disc == nil {
wakuPX.log.Warn("trying to run discovery v5 (for PX) while it's disabled")
return
}
wakuPX.log.Info("starting peer exchange discovery v5 loop")
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// This loop "competes" with the loop in wakunode2
// For the purpose of collecting px peers, 30 sec intervals should be enough
wakuPX.findPeers()
for {
select {
case <-wakuPX.quit:
return
case <-ticker.C:
wakuPX.findPeers()
}
}
}

View File

@ -0,0 +1,58 @@
package peer_exchange
import (
"context"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
type PeerExchangeParameters struct {
host host.Host
selectedPeer peer.ID
log *zap.Logger
}
type PeerExchangeOption func(*PeerExchangeParameters)
// WithPeer is an option used to specify the peerID to push a waku message to
func WithPeer(p peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
params.selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to push a waku message to
func WithAutomaticPeerSelection() PeerExchangeOption {
return func(params *PeerExchangeParameters) {
p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), params.log)
if err == nil {
params.selectedPeer = *p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping
func WithFastestPeerSelection(ctx context.Context) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), params.log)
if err == nil {
params.selectedPeer = *p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
}
}
// DefaultOptions are the default options to be used when using the lightpush protocol
func DefaultOptions(host host.Host) []PeerExchangeOption {
return []PeerExchangeOption{
WithAutomaticPeerSelection(),
}
}

View File

@ -3,7 +3,7 @@ package relay
import (
"sync"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// Subscription handles the subscrition to a particular pubsub topic

View File

@ -17,12 +17,12 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/status-im/go-waku/logging"
v2 "github.com/status-im/go-waku/waku/v2"
"github.com/status-im/go-waku/waku/v2/metrics"
waku_proto "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")

View File

@ -5,7 +5,7 @@ import (
"sync"
"github.com/cruxic/go-hmac-drbg/hmacdrbg"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

View File

@ -6,7 +6,7 @@ import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)

View File

@ -15,9 +15,9 @@ import (
proto "github.com/golang/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)

View File

@ -13,7 +13,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/go-waku/waku/v2/protocol/rln/contracts"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
@ -195,9 +195,8 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi
}
errCh <- err
subs.Unsubscribe()
}
rln.log.Error("subscribing to rln events", zap.Error(err))
}
return subs, err
})

View File

@ -15,13 +15,13 @@ import (
"github.com/libp2p/go-msgio/protoio"
"go.uber.org/zap"
"github.com/status-im/go-waku/logging"
"github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/swap"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/swap"
"github.com/waku-org/go-waku/waku/v2/utils"
)
// StoreID_v20beta4 is the current Waku Store protocol identifier
@ -30,10 +30,17 @@ const StoreID_v20beta4 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta4")
// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
// MaxContentFilters is the maximum number of allowed content filters in a query
const MaxContentFilters = 10
// MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
const MaxTimeVariance = time.Duration(20) * time.Second
var (
// ErrMaxContentFilters is returned when the number of content topics in the query
// exceeds the limit
ErrMaxContentFilters = errors.New("exceeds the maximum number of content filters allowed")
// ErrNoPeersAvailable is returned when there are no store peers in the peer store
// that could be used to retrieve message history
ErrNoPeersAvailable = errors.New("no suitable remote peers")
@ -62,6 +69,10 @@ func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*pb.Wa
query.PagingInfo.PageSize = MaxPageSize
}
if len(query.ContentFilters) > MaxContentFilters {
return nil, nil, ErrMaxContentFilters
}
cursor, queryResult, err := msgProvider.Query(query)
if err != nil {
return nil, nil, err
@ -158,9 +169,12 @@ type WakuStore struct {
swap *swap.WakuSwap
}
type criteriaFN = func(msg *pb.WakuMessage) (bool, error)
type Store interface {
Start(ctx context.Context)
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error)
Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)
MessageChannel() chan *protocol.Envelope
@ -436,6 +450,10 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf})
}
if len(q.ContentFilters) > MaxContentFilters {
return nil, ErrMaxContentFilters
}
params := new(HistoryRequestParameters)
params.s = store
@ -479,7 +497,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
messageID, _, _ := m.Hash()
messageIDs = append(messageIDs, messageID)
}
store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs...))
store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs))
result := &Result{
Messages: response.Messages,
@ -491,6 +509,42 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
return result, nil
}
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
func (store *WakuStore) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) {
if cb == nil {
return nil, errors.New("callback can't be null")
}
result, err := store.Query(ctx, query, opts...)
if err != nil {
return nil, err
}
for {
for _, m := range result.Messages {
found, err := cb(m)
if err != nil {
return nil, err
}
if found {
return m, nil
}
}
if result.IsComplete() {
break
}
result, err = store.Next(ctx, result)
if err != nil {
return nil, err
}
}
return nil, nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually

View File

@ -10,7 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/status-im/go-waku/logging"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
)

45
vendor/modules.txt vendored
View File

@ -900,28 +900,6 @@ github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.4
## explicit; go 1.18
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/go-waku v0.2.3-0.20221107152343-98c3ca8dc60f
## explicit; go 1.18
github.com/status-im/go-waku/logging
github.com/status-im/go-waku/waku/persistence
github.com/status-im/go-waku/waku/persistence/migrations
github.com/status-im/go-waku/waku/try
github.com/status-im/go-waku/waku/v2
github.com/status-im/go-waku/waku/v2/discv5
github.com/status-im/go-waku/waku/v2/dnsdisc
github.com/status-im/go-waku/waku/v2/metrics
github.com/status-im/go-waku/waku/v2/node
github.com/status-im/go-waku/waku/v2/noise
github.com/status-im/go-waku/waku/v2/protocol
github.com/status-im/go-waku/waku/v2/protocol/filter
github.com/status-im/go-waku/waku/v2/protocol/lightpush
github.com/status-im/go-waku/waku/v2/protocol/pb
github.com/status-im/go-waku/waku/v2/protocol/relay
github.com/status-im/go-waku/waku/v2/protocol/rln
github.com/status-im/go-waku/waku/v2/protocol/rln/contracts
github.com/status-im/go-waku/waku/v2/protocol/store
github.com/status-im/go-waku/waku/v2/protocol/swap
github.com/status-im/go-waku/waku/v2/utils
# github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969
## explicit
github.com/status-im/keycard-go/derivationpath
@ -1012,6 +990,29 @@ github.com/vacp2p/mvds/transport
github.com/waku-org/go-discover/discover
github.com/waku-org/go-discover/discover/v4wire
github.com/waku-org/go-discover/discover/v5wire
# github.com/waku-org/go-waku v0.2.3-0.20221122234656-5d7d05ca16a5
## explicit; go 1.18
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/waku/persistence
github.com/waku-org/go-waku/waku/persistence/migrations
github.com/waku-org/go-waku/waku/try
github.com/waku-org/go-waku/waku/v2
github.com/waku-org/go-waku/waku/v2/discv5
github.com/waku-org/go-waku/waku/v2/dnsdisc
github.com/waku-org/go-waku/waku/v2/metrics
github.com/waku-org/go-waku/waku/v2/node
github.com/waku-org/go-waku/waku/v2/noise
github.com/waku-org/go-waku/waku/v2/protocol
github.com/waku-org/go-waku/waku/v2/protocol/filter
github.com/waku-org/go-waku/waku/v2/protocol/lightpush
github.com/waku-org/go-waku/waku/v2/protocol/pb
github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange
github.com/waku-org/go-waku/waku/v2/protocol/relay
github.com/waku-org/go-waku/waku/v2/protocol/rln
github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts
github.com/waku-org/go-waku/waku/v2/protocol/store
github.com/waku-org/go-waku/waku/v2/protocol/swap
github.com/waku-org/go-waku/waku/v2/utils
# github.com/waku-org/go-zerokit-rln v0.1.6
## explicit; go 1.17
github.com/waku-org/go-zerokit-rln/rln

View File

@ -26,9 +26,9 @@ import (
"sync"
"time"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/status-im/status-go/wakuv2/common"

View File

@ -7,8 +7,8 @@ import (
"sync"
"time"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"

View File

@ -8,10 +8,10 @@ import (
"sync"
"time"
gowakuPersistence "github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
gowakuPersistence "github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

View File

@ -52,10 +52,10 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/status-im/go-waku/waku/v2/dnsdisc"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/signal"
@ -63,9 +63,9 @@ import (
"github.com/status-im/status-go/wakuv2/persistence"
"github.com/libp2p/go-libp2p/core/discovery"
node "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/store"
node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
)
const messageQueueLimit = 1024