chore_: bump go-waku to change datatype of `waku2` field in ENR to byte array
This commit is contained in:
parent
d794e43347
commit
e91568ce26
2
go.mod
2
go.mod
|
@ -95,7 +95,7 @@ require (
|
||||||
github.com/schollz/peerdiscovery v1.7.0
|
github.com/schollz/peerdiscovery v1.7.0
|
||||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||||
github.com/urfave/cli/v2 v2.27.2
|
github.com/urfave/cli/v2 v2.27.2
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
|
github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7
|
||||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -2136,8 +2136,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
||||||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ=
|
github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7 h1:L4fJfQvzmZi7C9oPz8Yr55/ZIpj345YIfciVt+VkO+4=
|
||||||
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
|
||||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
@ -186,6 +187,7 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DBStore) updateMetrics(ctx context.Context) {
|
func (d *DBStore) updateMetrics(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
defer d.wg.Done()
|
defer d.wg.Done()
|
||||||
|
@ -251,6 +253,7 @@ func (d *DBStore) getDeleteOldRowsQuery() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) {
|
func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer d.wg.Done()
|
defer d.wg.Done()
|
||||||
|
|
||||||
ticker := time.NewTicker(t)
|
ticker := time.NewTicker(t)
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,6 +28,8 @@ func (fc FilterConfig) String() string {
|
||||||
return string(jsonStr)
|
return string(jsonStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const filterSubLoopInterval = 5 * time.Second
|
||||||
|
|
||||||
type Sub struct {
|
type Sub struct {
|
||||||
ContentFilter protocol.ContentFilter
|
ContentFilter protocol.ContentFilter
|
||||||
DataCh chan *protocol.Envelope
|
DataCh chan *protocol.Envelope
|
||||||
|
@ -69,13 +72,7 @@ func defaultOptions() []SubscribeOptions {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe
|
// Subscribe
|
||||||
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) {
|
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
|
||||||
optList := append(defaultOptions(), opts...)
|
|
||||||
params := new(subscribeParameters)
|
|
||||||
for _, opt := range optList {
|
|
||||||
opt(params)
|
|
||||||
}
|
|
||||||
|
|
||||||
sub := new(Sub)
|
sub := new(Sub)
|
||||||
sub.id = uuid.NewString()
|
sub.id = uuid.NewString()
|
||||||
sub.wf = wf
|
sub.wf = wf
|
||||||
|
@ -95,12 +92,14 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
|
||||||
sub.multiplex(subs)
|
sub.multiplex(subs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// filter subscription loop is to check if target subscriptions for a filter are active and if not
|
||||||
go sub.subscriptionLoop(params.batchInterval)
|
// trigger resubscribe.
|
||||||
|
go sub.subscriptionLoop(filterSubLoopInterval)
|
||||||
return sub, nil
|
return sub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
|
func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
_, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter)
|
_, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter)
|
||||||
//Not reading result unless we want to do specific error handling?
|
//Not reading result unless we want to do specific error handling?
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -109,6 +108,7 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
|
func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
ticker := time.NewTicker(batchInterval)
|
ticker := time.NewTicker(batchInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
|
@ -216,12 +216,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
|
||||||
for _, subDetails := range subs {
|
for _, subDetails := range subs {
|
||||||
apiSub.subs[subDetails.ID] = subDetails
|
apiSub.subs[subDetails.ID] = subDetails
|
||||||
go func(subDetails *subscription.SubscriptionDetails) {
|
go func(subDetails *subscription.SubscriptionDetails) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
|
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
|
||||||
for env := range subDetails.C {
|
for env := range subDetails.C {
|
||||||
apiSub.DataCh <- env
|
apiSub.DataCh <- env
|
||||||
}
|
}
|
||||||
}(subDetails)
|
}(subDetails)
|
||||||
go func(subDetails *subscription.SubscriptionDetails) {
|
go func(subDetails *subscription.SubscriptionDetails) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
select {
|
select {
|
||||||
case <-apiSub.ctx.Done():
|
case <-apiSub.ctx.Done():
|
||||||
return
|
return
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Methods on FilterManager just aggregate filters from application and subscribe to them
|
// Methods on FilterManager just aggregate filters from application and subscribe to them
|
||||||
|
@ -31,7 +32,7 @@ type appFilterMap map[string]filterConfig
|
||||||
type FilterManager struct {
|
type FilterManager struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
opts []SubscribeOptions
|
params *subscribeParameters
|
||||||
minPeersPerFilter int
|
minPeersPerFilter int
|
||||||
onlineChecker *onlinechecker.DefaultOnlineChecker
|
onlineChecker *onlinechecker.DefaultOnlineChecker
|
||||||
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
|
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
|
||||||
|
@ -64,7 +65,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
|
||||||
// This fn is being mocked in test
|
// This fn is being mocked in test
|
||||||
mgr := new(FilterManager)
|
mgr := new(FilterManager)
|
||||||
mgr.ctx = ctx
|
mgr.ctx = ctx
|
||||||
mgr.opts = opts
|
|
||||||
mgr.logger = logger
|
mgr.logger = logger
|
||||||
mgr.minPeersPerFilter = minPeersPerFilter
|
mgr.minPeersPerFilter = minPeersPerFilter
|
||||||
mgr.envProcessor = envProcessor
|
mgr.envProcessor = envProcessor
|
||||||
|
@ -72,15 +72,23 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
|
||||||
mgr.node = node
|
mgr.node = node
|
||||||
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
|
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
|
||||||
mgr.node.SetOnlineChecker(mgr.onlineChecker)
|
mgr.node.SetOnlineChecker(mgr.onlineChecker)
|
||||||
mgr.filterSubBatchDuration = 5 * time.Second
|
|
||||||
mgr.incompleteFilterBatch = make(map[string]filterConfig)
|
mgr.incompleteFilterBatch = make(map[string]filterConfig)
|
||||||
mgr.filterConfigs = make(appFilterMap)
|
mgr.filterConfigs = make(appFilterMap)
|
||||||
mgr.waitingToSubQueue = make(chan filterConfig, 100)
|
mgr.waitingToSubQueue = make(chan filterConfig, 100)
|
||||||
|
|
||||||
|
//parsing the subscribe params only to read the batchInterval passed.
|
||||||
|
mgr.params = new(subscribeParameters)
|
||||||
|
opts = append(defaultOptions(), opts...)
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(mgr.params)
|
||||||
|
}
|
||||||
|
mgr.filterSubBatchDuration = mgr.params.batchInterval
|
||||||
go mgr.startFilterSubLoop()
|
go mgr.startFilterSubLoop()
|
||||||
return mgr
|
return mgr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *FilterManager) startFilterSubLoop() {
|
func (mgr *FilterManager) startFilterSubLoop() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
ticker := time.NewTicker(mgr.filterSubBatchDuration)
|
ticker := time.NewTicker(mgr.filterSubBatchDuration)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
|
@ -151,9 +159,10 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
|
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
ctx, cancel := context.WithCancel(mgr.ctx)
|
ctx, cancel := context.WithCancel(mgr.ctx)
|
||||||
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
|
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
|
||||||
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...)
|
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
|
||||||
mgr.Lock()
|
mgr.Lock()
|
||||||
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
|
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
|
||||||
mgr.Unlock()
|
mgr.Unlock()
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
@ -102,6 +103,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
|
||||||
m.C = c
|
m.C = c
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
t := time.NewTicker(m.params.interval)
|
t := time.NewTicker(m.params.interval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
|
@ -123,6 +125,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
|
||||||
default:
|
default:
|
||||||
semaphore <- struct{}{}
|
semaphore <- struct{}{}
|
||||||
go func(interest criteriaInterest) {
|
go func(interest criteriaInterest) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
m.fetchHistory(c, interest)
|
m.fetchHistory(c, interest)
|
||||||
<-semaphore
|
<-semaphore
|
||||||
}(interest)
|
}(interest)
|
||||||
|
@ -276,6 +279,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(messageHashes []pb.MessageHash) {
|
go func(messageHashes []pb.MessageHash) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wg.Wait()
|
defer wg.Wait()
|
||||||
|
|
||||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
|
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -145,6 +146,7 @@ func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) {
|
||||||
|
|
||||||
// Start checks if the tracked outgoing messages are stored periodically
|
// Start checks if the tracked outgoing messages are stored periodically
|
||||||
func (m *MessageSentCheck) Start() {
|
func (m *MessageSentCheck) Start() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
ticker := time.NewTicker(m.hashQueryInterval)
|
ticker := time.NewTicker(m.hashQueryInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MessagePriority determines the ordering for the message priority queue
|
// MessagePriority determines the ordering for the message priority queue
|
||||||
|
@ -182,6 +183,7 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope {
|
||||||
ch := make(chan *protocol.Envelope)
|
ch := make(chan *protocol.Envelope)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/nat"
|
"github.com/ethereum/go-ethereum/p2p/nat"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -172,6 +171,7 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
|
||||||
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
|
if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
|
||||||
d.WaitGroup().Add(1)
|
d.WaitGroup().Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer d.WaitGroup().Done()
|
defer d.WaitGroup().Done()
|
||||||
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
|
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
|
||||||
}()
|
}()
|
||||||
|
@ -217,6 +217,7 @@ func (d *DiscoveryV5) start() error {
|
||||||
if d.params.autoFindPeers {
|
if d.params.autoFindPeers {
|
||||||
d.WaitGroup().Add(1)
|
d.WaitGroup().Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer d.WaitGroup().Done()
|
defer d.WaitGroup().Done()
|
||||||
d.runDiscoveryV5Loop(d.Context())
|
d.runDiscoveryV5Loop(d.Context())
|
||||||
}()
|
}()
|
||||||
|
@ -253,19 +254,13 @@ func (d *DiscoveryV5) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func isWakuNode(node *enode.Node) bool {
|
func isWakuNode(node *enode.Node) bool {
|
||||||
enrField := new(wenr.WakuEnrBitfield)
|
enrField, err := wenr.GetWakuEnrBitField(node)
|
||||||
if err := node.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err != nil {
|
if err != nil {
|
||||||
if !enr.IsNotFound(err) {
|
utils.Logger().Named("discv5").Error("could not retrieve waku2 ENR field for enr ", zap.Error(err))
|
||||||
utils.Logger().Named("discv5").Error("could not retrieve waku2 ENR field for enr ", zap.Any("node", node))
|
|
||||||
}
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if enrField != nil {
|
return enrField != uint8(0) // #RFC 31 requirement
|
||||||
return *enrField != uint8(0) // #RFC 31 requirement
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool {
|
func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool {
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// FilterPredicate is to create a Predicate using a custom function
|
// FilterPredicate is to create a Predicate using a custom function
|
||||||
|
@ -36,16 +35,11 @@ func FilterShard(cluster, index uint16) Predicate {
|
||||||
func FilterCapabilities(flags wenr.WakuEnrBitfield) Predicate {
|
func FilterCapabilities(flags wenr.WakuEnrBitfield) Predicate {
|
||||||
return func(iterator enode.Iterator) enode.Iterator {
|
return func(iterator enode.Iterator) enode.Iterator {
|
||||||
predicate := func(node *enode.Node) bool {
|
predicate := func(node *enode.Node) bool {
|
||||||
enrField := new(wenr.WakuEnrBitfield)
|
enrField, err := wenr.GetWakuEnrBitField(node)
|
||||||
if err := node.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
return enrField&flags == flags
|
||||||
if enrField == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return *enrField&flags == flags
|
|
||||||
}
|
}
|
||||||
return enode.Filter(iterator, predicate)
|
return enode.Filter(iterator, predicate)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestPeerDiscoverer is mock peer discoverer for testing
|
// TestPeerDiscoverer is mock peer discoverer for testing
|
||||||
|
@ -26,6 +27,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer {
|
||||||
// Subscribe is for subscribing to peer discoverer
|
// Subscribe is for subscribing to peer discoverer
|
||||||
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) {
|
func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerStatis is a map of peer IDs to supported protocols
|
// PeerStatis is a map of peer IDs to supported protocols
|
||||||
|
@ -101,6 +102,7 @@ func (c ConnectionNotifier) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) connectednessListener(ctx context.Context) {
|
func (w *WakuNode) connectednessListener(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
)
|
)
|
||||||
|
@ -40,6 +41,7 @@ func disconnectAllPeers(host host.Host, logger *zap.Logger) {
|
||||||
// This is necessary because TCP connections are automatically closed due to inactivity,
|
// This is necessary because TCP connections are automatically closed due to inactivity,
|
||||||
// and doing a ping will avoid this (with a small bandwidth cost)
|
// and doing a ping will avoid this (with a small bandwidth cost)
|
||||||
func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) {
|
func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
if !w.opts.enableRelay {
|
if !w.opts.enableRelay {
|
||||||
|
@ -168,6 +170,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) {
|
func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
logger := w.log.With(logging.HostID("peer", peerID))
|
logger := w.log.With(logging.HostID("peer", peerID))
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -358,6 +359,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer evtRelaySubscribed.Close()
|
defer evtRelaySubscribed.Close()
|
||||||
defer evtRelayUnsubscribed.Close()
|
defer evtRelayUnsubscribed.Close()
|
||||||
|
|
||||||
|
@ -411,6 +413,7 @@ func (w *WakuNode) registerAndMonitorReachability(ctx context.Context) {
|
||||||
}
|
}
|
||||||
w.wg.Add(1)
|
w.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer myEventSub.Close()
|
defer myEventSub.Close()
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
|
|
|
@ -214,6 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
|
func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
|
||||||
r := make(chan peer.AddrInfo)
|
r := make(chan peer.AddrInfo)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer close(r)
|
defer close(r)
|
||||||
for ; numPeers != 0; numPeers-- {
|
for ; numPeers != 0; numPeers-- {
|
||||||
select {
|
select {
|
||||||
|
@ -292,7 +293,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
||||||
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
||||||
|
|
||||||
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
|
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit)
|
||||||
|
|
||||||
if params.storeFactory != nil {
|
if params.storeFactory != nil {
|
||||||
w.storeFactory = params.storeFactory
|
w.storeFactory = params.storeFactory
|
||||||
|
@ -308,6 +309,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) {
|
func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
addrsSet := utils.MultiAddrSet(w.ListenAddresses()...)
|
addrsSet := utils.MultiAddrSet(w.ListenAddresses()...)
|
||||||
|
@ -550,6 +552,7 @@ func (w *WakuNode) ID() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) watchENRChanges(ctx context.Context) {
|
func (w *WakuNode) watchENRChanges(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
var prevNodeVal string
|
var prevNodeVal string
|
||||||
|
@ -752,7 +755,9 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
|
||||||
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
|
||||||
err := w.host.Connect(ctx, info)
|
err := w.host.Connect(ctx, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID)
|
if w.peermanager != nil {
|
||||||
|
w.peermanager.HandleDialError(err, info.ID)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -885,6 +890,7 @@ func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) findRelayNodes(ctx context.Context) {
|
func (w *WakuNode) findRelayNodes(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer w.wg.Done()
|
defer w.wg.Done()
|
||||||
|
|
||||||
// Feed peers more often right after the bootstrap, then backoff
|
// Feed peers more often right after the bootstrap, then backoff
|
||||||
|
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default UserAgent
|
// Default UserAgent
|
||||||
|
@ -94,6 +95,8 @@ type WakuNodeParameters struct {
|
||||||
enableStore bool
|
enableStore bool
|
||||||
messageProvider legacy_store.MessageProvider
|
messageProvider legacy_store.MessageProvider
|
||||||
|
|
||||||
|
storeRateLimit rate.Limit
|
||||||
|
|
||||||
enableRendezvousPoint bool
|
enableRendezvousPoint bool
|
||||||
rendezvousDB *rendezvous.DB
|
rendezvousDB *rendezvous.DB
|
||||||
|
|
||||||
|
@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
|
||||||
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
||||||
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
||||||
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
||||||
|
WithWakuStoreRateLimit(8), // Value currently set in status.staging
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiAddresses return the list of multiaddresses configured in the node
|
// MultiAddresses return the list of multiaddresses configured in the node
|
||||||
|
@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will
|
||||||
|
// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming
|
||||||
|
// the store protocol from a storenode
|
||||||
|
func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption {
|
||||||
|
return func(params *WakuNodeParameters) error {
|
||||||
|
params.storeRateLimit = value
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
||||||
// be stored or not in a message provider.
|
// be stored or not in a message provider.
|
||||||
func WithWakuStore() WakuNodeOption {
|
func WithWakuStore() WakuNodeOption {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
manet "github.com/multiformats/go-multiaddr/net"
|
manet "github.com/multiformats/go-multiaddr/net"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -77,6 +78,7 @@ func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason
|
||||||
|
|
||||||
// NotifyDisconnect is called when a connection disconnects.
|
// NotifyDisconnect is called when a connection disconnects.
|
||||||
func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) {
|
func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
ip, err := manet.ToIP(addr)
|
ip, err := manet.ToIP(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
|
3
vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go
generated
vendored
3
vendor/github.com/waku-org/go-waku/waku/v2/peermanager/fastest_peer_selector.go
generated
vendored
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,9 +70,11 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic
|
||||||
pinged := make(map[peer.ID]struct{})
|
pinged := make(map[peer.ID]struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
// Ping any peer with no latency recorded
|
// Ping any peer with no latency recorded
|
||||||
for peerToPing := range pingCh {
|
for peerToPing := range pingCh {
|
||||||
go func(p peer.ID) {
|
go func(p peer.ID) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
rtt := time.Hour
|
rtt := time.Hour
|
||||||
result, err := r.PingPeer(ctx, p)
|
result, err := r.PingPeer(ctx, p)
|
||||||
|
|
|
@ -4,7 +4,6 @@ package peermanager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -19,6 +18,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
@ -104,6 +104,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan servic
|
||||||
// if running start a goroutine to consume the subscription
|
// if running start a goroutine to consume the subscription
|
||||||
c.WaitGroup().Add(1)
|
c.WaitGroup().Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer c.WaitGroup().Done()
|
defer c.WaitGroup().Done()
|
||||||
c.consumeSubscription(subscription{ctx, ch})
|
c.consumeSubscription(subscription{ctx, ch})
|
||||||
}()
|
}()
|
||||||
|
@ -187,6 +188,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions() {
|
||||||
for _, subs := range c.subscriptions {
|
for _, subs := range c.subscriptions {
|
||||||
c.WaitGroup().Add(1)
|
c.WaitGroup().Add(1)
|
||||||
go func(s subscription) {
|
go func(s subscription) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer c.WaitGroup().Done()
|
defer c.WaitGroup().Done()
|
||||||
c.consumeSubscription(s)
|
c.consumeSubscription(s)
|
||||||
}(subs)
|
}(subs)
|
||||||
|
@ -234,6 +236,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PeerConnectionStrategy) dialPeers() {
|
func (c *PeerConnectionStrategy) dialPeers() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer c.WaitGroup().Done()
|
defer c.WaitGroup().Done()
|
||||||
|
|
||||||
maxGoRoutines := c.pm.OutPeersTarget
|
maxGoRoutines := c.pm.OutPeersTarget
|
||||||
|
@ -273,15 +276,15 @@ func (c *PeerConnectionStrategy) dialPeers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
|
func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer c.WaitGroup().Done()
|
defer c.WaitGroup().Done()
|
||||||
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
|
ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err := c.host.Connect(ctx, pi)
|
err := c.host.Connect(ctx, pi)
|
||||||
if err != nil && !errors.Is(err, context.Canceled) {
|
if err != nil {
|
||||||
c.addConnectionBackoff(pi.ID)
|
c.pm.HandleDialError(err, pi.ID)
|
||||||
c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID)
|
} else {
|
||||||
c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err))
|
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
|
||||||
}
|
}
|
||||||
c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID)
|
|
||||||
<-sem
|
<-sem
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -103,6 +104,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) {
|
func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...)
|
shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err))
|
pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err))
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"github.com/libp2p/go-libp2p/core/event"
|
"github.com/libp2p/go-libp2p/core/event"
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
|
@ -23,6 +22,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
|
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -87,6 +87,7 @@ type PeerManager struct {
|
||||||
TopicHealthNotifCh chan<- TopicHealthStatus
|
TopicHealthNotifCh chan<- TopicHealthStatus
|
||||||
rttCache *FastestPeerSelector
|
rttCache *FastestPeerSelector
|
||||||
RelayEnabled bool
|
RelayEnabled bool
|
||||||
|
evtDialError event.Emitter
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerSelection provides various options based on which Peer is selected from a list of peers.
|
// PeerSelection provides various options based on which Peer is selected from a list of peers.
|
||||||
|
@ -249,9 +250,18 @@ func (pm *PeerManager) Start(ctx context.Context) {
|
||||||
go pm.connectivityLoop(ctx)
|
go pm.connectivityLoop(ctx)
|
||||||
}
|
}
|
||||||
go pm.peerStoreLoop(ctx)
|
go pm.peerStoreLoop(ctx)
|
||||||
|
|
||||||
|
if pm.host != nil {
|
||||||
|
var err error
|
||||||
|
pm.evtDialError, err = pm.host.EventBus().Emitter(new(utils.DialError))
|
||||||
|
if err != nil {
|
||||||
|
pm.logger.Error("failed to create dial error emitter", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
t := time.NewTicker(prunePeerStoreInterval)
|
t := time.NewTicker(prunePeerStoreInterval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
for {
|
for {
|
||||||
|
@ -353,6 +363,7 @@ func (pm *PeerManager) prunePeerStore() {
|
||||||
|
|
||||||
// This is a connectivity loop, which currently checks and prunes inbound connections.
|
// This is a connectivity loop, which currently checks and prunes inbound connections.
|
||||||
func (pm *PeerManager) connectivityLoop(ctx context.Context) {
|
func (pm *PeerManager) connectivityLoop(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
pm.connectToPeers()
|
pm.connectToPeers()
|
||||||
t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
|
t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
@ -535,8 +546,8 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
|
||||||
}
|
}
|
||||||
supportedProtos := []protocol.ID{}
|
supportedProtos := []protocol.ID{}
|
||||||
//Identify and specify protocols supported by the peer based on the discovered peer's ENR
|
//Identify and specify protocols supported by the peer based on the discovered peer's ENR
|
||||||
var enrField wenr.WakuEnrBitfield
|
enrField, err := wenr.GetWakuEnrBitField(p.ENR)
|
||||||
if err := p.ENR.Record().Load(enr.WithEntry(wenr.WakuENRField, &enrField)); err == nil {
|
if err == nil {
|
||||||
for proto, protoENR := range pm.wakuprotoToENRFieldMap {
|
for proto, protoENR := range pm.wakuprotoToENRFieldMap {
|
||||||
protoENRField := protoENR.waku2ENRBitField
|
protoENRField := protoENR.waku2ENRBitField
|
||||||
if protoENRField&enrField != 0 {
|
if protoENRField&enrField != 0 {
|
||||||
|
@ -719,3 +730,22 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
|
||||||
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
|
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
|
||||||
pm.serviceSlots.getPeers(proto).add(peerID)
|
pm.serviceSlots.getPeers(proto).add(peerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
|
||||||
|
if err == nil || errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if pm.peerConnector != nil {
|
||||||
|
pm.peerConnector.addConnectionBackoff(peerID)
|
||||||
|
}
|
||||||
|
if pm.host != nil {
|
||||||
|
pm.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(peerID)
|
||||||
|
}
|
||||||
|
pm.logger.Warn("connecting to peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||||
|
if pm.evtDialError != nil {
|
||||||
|
emitterErr := pm.evtDialError.Emit(utils.DialError{Err: err, PeerID: peerID})
|
||||||
|
if emitterErr != nil {
|
||||||
|
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
)
|
)
|
||||||
|
@ -162,6 +163,7 @@ func (pm *PeerManager) handlerPeerTopicEvent(peerEvt relay.EvtPeerTopic) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *PeerManager) peerEventLoop(ctx context.Context) {
|
func (pm *PeerManager) peerEventLoop(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer pm.sub.Close()
|
defer pm.sub.Close()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -28,6 +28,23 @@ const ShardingBitVectorEnrField = "rsv"
|
||||||
// WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/).
|
// WakuEnrBitfield is a8-bit flag field to indicate Waku capabilities. Only the 4 LSBs are currently defined according to RFC31 (https://rfc.vac.dev/spec/31/).
|
||||||
type WakuEnrBitfield = uint8
|
type WakuEnrBitfield = uint8
|
||||||
|
|
||||||
|
func GetWakuEnrBitField(node *enode.Node) (WakuEnrBitfield, error) {
|
||||||
|
enrField := []byte{}
|
||||||
|
err := node.Record().Load(enr.WithEntry(WakuENRField, &enrField))
|
||||||
|
if err != nil {
|
||||||
|
if enr.IsNotFound(err) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(enrField) == 0 {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return WakuEnrBitfield(enrField[0]), nil
|
||||||
|
}
|
||||||
|
|
||||||
// NewWakuEnrBitfield creates a WakuEnrBitField whose value will depend on which protocols are enabled in the node
|
// NewWakuEnrBitfield creates a WakuEnrBitField whose value will depend on which protocols are enabled in the node
|
||||||
func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
|
func NewWakuEnrBitfield(lightpush, filter, store, relay bool) WakuEnrBitfield {
|
||||||
var v uint8
|
var v uint8
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
@ -127,6 +128,7 @@ func (wf *WakuFilterLightNode) Stop() {
|
||||||
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
|
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
|
||||||
if wf.subscriptions.Count() > 0 {
|
if wf.subscriptions.Count() > 0 {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = recover()
|
_ = recover()
|
||||||
}()
|
}()
|
||||||
|
@ -245,8 +247,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
||||||
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
|
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wf.metrics.RecordError(dialFailure)
|
wf.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wf.pm != nil {
|
||||||
ps.AddConnFailure(peerID)
|
wf.pm.HandleDialError(err, peerID)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -414,6 +416,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
|
||||||
for i, peerID := range selectedPeers {
|
for i, peerID := range selectedPeers {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(index int, ID peer.ID) {
|
go func(index int, ID peer.ID) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
err := wf.request(
|
err := wf.request(
|
||||||
reqCtx,
|
reqCtx,
|
||||||
|
@ -565,6 +568,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
|
||||||
// send unsubscribe request to all the peers
|
// send unsubscribe request to all the peers
|
||||||
for peerID := range peers {
|
for peerID := range peers {
|
||||||
go func(peerID peer.ID) {
|
go func(peerID peer.ID) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer func() {
|
defer func() {
|
||||||
if params.wg != nil {
|
if params.wg != nil {
|
||||||
params.wg.Done()
|
params.wg.Done()
|
||||||
|
@ -687,6 +691,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
|
||||||
}
|
}
|
||||||
for peerId := range peers {
|
for peerId := range peers {
|
||||||
go func(peerID peer.ID) {
|
go func(peerID peer.ID) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer func() {
|
defer func() {
|
||||||
if params.wg != nil {
|
if params.wg != nil {
|
||||||
params.wg.Done()
|
params.wg.Done()
|
||||||
|
|
3
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go
generated
vendored
3
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go
generated
vendored
|
@ -5,6 +5,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -19,6 +20,7 @@ func (wf *WakuFilterLightNode) PingPeers() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
|
func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
|
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
err := wf.Ping(ctxWithTimeout, peer)
|
err := wf.Ping(ctxWithTimeout, peer)
|
||||||
|
@ -41,6 +43,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
|
func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wf.WaitGroup().Done()
|
defer wf.WaitGroup().Done()
|
||||||
ticker := time.NewTicker(wf.peerPingInterval)
|
ticker := time.NewTicker(wf.peerPingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
|
@ -14,7 +14,7 @@ import (
|
||||||
"github.com/libp2p/go-msgio/pbio"
|
"github.com/libp2p/go-msgio/pbio"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
|
@ -38,6 +38,7 @@ type (
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
*service.CommonService
|
*service.CommonService
|
||||||
subscriptions *SubscribersMap
|
subscriptions *SubscribersMap
|
||||||
|
pm *peermanager.PeerManager
|
||||||
|
|
||||||
maxSubscriptions int
|
maxSubscriptions int
|
||||||
}
|
}
|
||||||
|
@ -61,6 +62,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
|
||||||
wf.maxSubscriptions = params.MaxSubscribers
|
wf.maxSubscriptions = params.MaxSubscribers
|
||||||
if params.pm != nil {
|
if params.pm != nil {
|
||||||
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField)
|
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, FilterSubscribeENRField)
|
||||||
|
wf.pm = params.pm
|
||||||
}
|
}
|
||||||
return wf
|
return wf
|
||||||
}
|
}
|
||||||
|
@ -216,6 +218,7 @@ func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, stream network
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
|
func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wf.WaitGroup().Done()
|
defer wf.WaitGroup().Done()
|
||||||
|
|
||||||
// This function is invoked for each message received
|
// This function is invoked for each message received
|
||||||
|
@ -237,6 +240,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
|
||||||
logger.Debug("pushing message to light node")
|
logger.Debug("pushing message to light node")
|
||||||
wf.WaitGroup().Add(1)
|
wf.WaitGroup().Add(1)
|
||||||
go func(subscriber peer.ID) {
|
go func(subscriber peer.ID) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wf.WaitGroup().Done()
|
defer wf.WaitGroup().Done()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := wf.pushMessage(ctx, logger, subscriber, envelope)
|
err := wf.pushMessage(ctx, logger, subscriber, envelope)
|
||||||
|
@ -274,8 +278,8 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
|
||||||
wf.metrics.RecordError(pushTimeoutFailure)
|
wf.metrics.RecordError(pushTimeoutFailure)
|
||||||
} else {
|
} else {
|
||||||
wf.metrics.RecordError(dialFailure)
|
wf.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wf.pm != nil {
|
||||||
ps.AddConnFailure(peerID)
|
wf.pm.HandleDialError(err, peerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.Error("opening peer stream", zap.Error(err))
|
logger.Error("opening peer stream", zap.Error(err))
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PeerSet map[peer.ID]struct{}
|
type PeerSet map[peer.ID]struct{}
|
||||||
|
@ -188,6 +189,7 @@ func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan
|
||||||
key := getKey(pubsubTopic, contentTopic)
|
key := getKey(pubsubTopic, contentTopic)
|
||||||
|
|
||||||
f := func() {
|
f := func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
sub.RLock()
|
sub.RLock()
|
||||||
defer sub.RUnlock()
|
defer sub.RUnlock()
|
||||||
|
|
||||||
|
@ -236,6 +238,7 @@ func (sub *SubscribersMap) Refresh(peerID peer.ID) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sub *SubscribersMap) cleanUp(ctx context.Context, cleanupInterval time.Duration) {
|
func (sub *SubscribersMap) cleanUp(ctx context.Context, cleanupInterval time.Duration) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
t := time.NewTicker(cleanupInterval)
|
t := time.NewTicker(cleanupInterval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
|
|
|
@ -205,10 +205,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
|
||||||
|
|
||||||
stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
|
stream, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating stream to peer", zap.Error(err))
|
|
||||||
store.metrics.RecordError(dialFailure)
|
store.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if store.pm != nil {
|
||||||
ps.AddConnFailure(selectedPeer)
|
store.pm.HandleDialError(err, selectedPeer)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.WakuMessage, *pb.PagingInfo, error) {
|
func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.WakuMessage, *pb.PagingInfo, error) {
|
||||||
|
@ -159,9 +160,11 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
|
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer store.wg.Done()
|
defer store.wg.Done()
|
||||||
for envelope := range store.MsgC.Ch {
|
for envelope := range store.MsgC.Ch {
|
||||||
go func(env *protocol.Envelope) {
|
go func(env *protocol.Envelope) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
_ = store.storeMessage(env)
|
_ = store.storeMessage(env)
|
||||||
}(envelope)
|
}(envelope)
|
||||||
}
|
}
|
||||||
|
|
6
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
6
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
|
@ -195,10 +195,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
|
||||||
|
|
||||||
stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
|
stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating stream to peer", zap.Error(err))
|
|
||||||
wakuLP.metrics.RecordError(dialFailure)
|
wakuLP.metrics.RecordError(dialFailure)
|
||||||
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wakuLP.pm != nil {
|
||||||
ps.AddConnFailure(peerID)
|
wakuLP.pm.HandleDialError(err, peerID)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -335,6 +334,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
|
||||||
for i, peerID := range params.selectedPeers {
|
for i, peerID := range params.selectedPeers {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(index int, id peer.ID) {
|
go func(index int, id peer.ID) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
paramsValue := *params
|
paramsValue := *params
|
||||||
paramsValue.requestID = protocol.GenerateRequestID()
|
paramsValue.requestID = protocol.GenerateRequestID()
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -225,6 +226,7 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) {
|
||||||
// Connected is called when a connection is opened
|
// Connected is called when a connection is opened
|
||||||
func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) {
|
func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer()))
|
wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer()))
|
||||||
// Metadata verification is done only if a clusterID is specified
|
// Metadata verification is done only if a clusterID is specified
|
||||||
if wakuM.clusterID == 0 {
|
if wakuM.clusterID == 0 {
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -76,8 +77,8 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
||||||
|
|
||||||
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
|
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if wakuPX.pm != nil {
|
||||||
ps.AddConnFailure(params.selectedPeer)
|
wakuPX.pm.HandleDialError(err, params.selectedPeer)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -123,13 +124,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
|
||||||
}
|
}
|
||||||
|
|
||||||
if params.clusterID != 0 {
|
if params.clusterID != 0 {
|
||||||
wakuPX.log.Debug("clusterID is non zero, filtering by shard")
|
|
||||||
rs, err := wenr.RelaySharding(enrRecord)
|
rs, err := wenr.RelaySharding(enrRecord)
|
||||||
if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) {
|
if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) {
|
||||||
wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard))
|
wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wakuPX.log.Debug("peer matches filter", zap.Int("shard", params.shard))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
|
||||||
|
@ -156,6 +155,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
|
||||||
wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(discoveredPeers)))
|
wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(discoveredPeers)))
|
||||||
wakuPX.WaitGroup().Add(1)
|
wakuPX.WaitGroup().Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wakuPX.WaitGroup().Done()
|
defer wakuPX.WaitGroup().Done()
|
||||||
|
|
||||||
peerCh := make(chan service.PeerData)
|
peerCh := make(chan service.PeerData)
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
@ -223,6 +224,7 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
|
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer wakuPX.WaitGroup().Done()
|
defer wakuPX.WaitGroup().Done()
|
||||||
|
|
||||||
// Runs a discv5 loop adding new peers to the px peer cache
|
// Runs a discv5 loop adding new peers to the px peer cache
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BroadcasterParameters struct {
|
type BroadcasterParameters struct {
|
||||||
|
@ -174,6 +175,7 @@ func (b *broadcaster) Start(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *broadcaster) run(ctx context.Context) {
|
func (b *broadcaster) run(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -13,6 +13,10 @@ import (
|
||||||
|
|
||||||
var DefaultRelaySubscriptionBufferSize int = 1024
|
var DefaultRelaySubscriptionBufferSize int = 1024
|
||||||
|
|
||||||
|
// trying to match value here https://github.com/vacp2p/nim-libp2p/pull/1077
|
||||||
|
// note that nim-libp2p has 2 peer queues 1 for priority and other non-priority, whereas go-libp2p seems to have single peer-queue
|
||||||
|
var DefaultPeerOutboundQSize int = 1024
|
||||||
|
|
||||||
type RelaySubscribeParameters struct {
|
type RelaySubscribeParameters struct {
|
||||||
dontConsume bool
|
dontConsume bool
|
||||||
cacheSize uint
|
cacheSize uint
|
||||||
|
@ -109,6 +113,7 @@ func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option {
|
||||||
pubsub.WithSeenMessagesTTL(2 * time.Minute),
|
pubsub.WithSeenMessagesTTL(2 * time.Minute),
|
||||||
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
|
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
|
||||||
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
|
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
|
||||||
|
pubsub.WithPeerOutboundQueueSize(DefaultPeerOutboundQSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -56,6 +57,7 @@ func newMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics {
|
||||||
// RecordMessage is used to increase the counter for the number of messages received via waku relay
|
// RecordMessage is used to increase the counter for the number of messages received via waku relay
|
||||||
func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) {
|
func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
payloadSizeInBytes := len(envelope.Message().Payload)
|
payloadSizeInBytes := len(envelope.Message().Payload)
|
||||||
payloadSizeInKb := float64(payloadSizeInBytes) / 1000
|
payloadSizeInKb := float64(payloadSizeInBytes) / 1000
|
||||||
messageSize.Observe(payloadSizeInKb)
|
messageSize.Observe(payloadSizeInKb)
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/libp2p/go-libp2p/core/event"
|
"github.com/libp2p/go-libp2p/core/event"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,6 +52,7 @@ func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.Topi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) {
|
func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer w.WaitGroup().Done()
|
defer w.WaitGroup().Done()
|
||||||
for {
|
for {
|
||||||
evt, err := handler.NextPeerEvent(w.Context())
|
evt, err := handler.NextPeerEvent(w.Context())
|
||||||
|
|
|
@ -439,6 +439,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
|
||||||
|
|
||||||
subscriptions = append(subscriptions, subscription)
|
subscriptions = append(subscriptions, subscription)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
subscription.Unsubscribe()
|
subscription.Unsubscribe()
|
||||||
}()
|
}()
|
||||||
|
@ -533,6 +534,7 @@ func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptio
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) pubsubTopicMsgHandler(sub *pubsub.Subscription) {
|
func (w *WakuRelay) pubsubTopicMsgHandler(sub *pubsub.Subscription) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer w.WaitGroup().Done()
|
defer w.WaitGroup().Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/rln/web3"
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/web3"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"github.com/waku-org/go-zerokit-rln/rln"
|
"github.com/waku-org/go-zerokit-rln/rln"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -120,6 +121,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) {
|
func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
defer mf.wg.Done()
|
defer mf.wg.Done()
|
||||||
|
|
||||||
// Watch for new events
|
// Watch for new events
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"github.com/waku-org/go-zerokit-rln/rln"
|
"github.com/waku-org/go-zerokit-rln/rln"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -89,6 +90,7 @@ func (n *NullifierLog) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) {
|
||||||
|
|
||||||
// cleanup cleans up the log every time there are more than MaxEpochGap epochs stored in it
|
// cleanup cleans up the log every time there are more than MaxEpochGap epochs stored in it
|
||||||
func (n *NullifierLog) cleanup(ctx context.Context) {
|
func (n *NullifierLog) cleanup(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
t := time.NewTicker(1 * time.Minute) // TODO: tune this
|
t := time.NewTicker(1 * time.Minute) // TODO: tune this
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,14 +70,19 @@ type WakuStore struct {
|
||||||
timesource timesource.Timesource
|
timesource timesource.Timesource
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
pm *peermanager.PeerManager
|
pm *peermanager.PeerManager
|
||||||
|
|
||||||
|
defaultRatelimit rate.Limit
|
||||||
|
rateLimiters map[peer.ID]*rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWakuStore is used to instantiate a StoreV3 client
|
// NewWakuStore is used to instantiate a StoreV3 client
|
||||||
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
|
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore {
|
||||||
s := new(WakuStore)
|
s := new(WakuStore)
|
||||||
s.log = log.Named("store-client")
|
s.log = log.Named("store-client")
|
||||||
s.timesource = timesource
|
s.timesource = timesource
|
||||||
s.pm = pm
|
s.pm = pm
|
||||||
|
s.defaultRatelimit = defaultRatelimit
|
||||||
|
s.rateLimiters = make(map[peer.ID]*rate.Limiter)
|
||||||
|
|
||||||
if pm != nil {
|
if pm != nil {
|
||||||
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
|
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
|
||||||
|
@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer)
|
response, err := s.queryFrom(ctx, storeRequest, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
|
||||||
return len(result.messages) != 0, nil
|
return len(result.messages) != 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
|
||||||
if r.IsComplete() {
|
if r.IsComplete() {
|
||||||
return &Result{
|
return &Result{
|
||||||
store: s,
|
store: s,
|
||||||
|
@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params := new(Parameters)
|
||||||
|
params.selectedPeer = r.PeerID()
|
||||||
|
optList := DefaultOptions()
|
||||||
|
optList = append(optList, opts...)
|
||||||
|
for _, opt := range optList {
|
||||||
|
err := opt(params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
|
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
|
||||||
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
storeRequest.PaginationCursor = r.Cursor()
|
storeRequest.PaginationCursor = r.Cursor()
|
||||||
|
|
||||||
response, err := s.queryFrom(ctx, storeRequest, r.PeerID())
|
response, err := s.queryFrom(ctx, storeRequest, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -245,16 +262,27 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
|
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
|
||||||
logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
|
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
|
||||||
|
|
||||||
logger.Debug("sending store request")
|
logger.Debug("sending store request")
|
||||||
|
|
||||||
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
|
if !params.skipRatelimit {
|
||||||
|
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
|
||||||
|
if !ok {
|
||||||
|
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
|
||||||
|
s.rateLimiters[params.selectedPeer] = rateLimiter
|
||||||
|
}
|
||||||
|
err := rateLimiter.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating stream to peer", zap.Error(err))
|
if s.pm != nil {
|
||||||
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
s.pm.HandleDialError(err, params.selectedPeer)
|
||||||
ps.AddConnFailure(selectedPeer)
|
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ type Parameters struct {
|
||||||
pageLimit uint64
|
pageLimit uint64
|
||||||
forward bool
|
forward bool
|
||||||
includeData bool
|
includeData bool
|
||||||
|
skipRatelimit bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type RequestOption func(*Parameters) error
|
type RequestOption func(*Parameters) error
|
||||||
|
@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429))
|
||||||
|
func SkipRateLimit() RequestOption {
|
||||||
|
return func(params *Parameters) error {
|
||||||
|
params.skipRatelimit = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Default options to be used when querying a store node for results
|
// Default options to be used when querying a store node for results
|
||||||
func DefaultOptions() []RequestOption {
|
func DefaultOptions() []RequestOption {
|
||||||
return []RequestOption{
|
return []RequestOption{
|
||||||
|
|
|
@ -2,6 +2,7 @@ package pb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxContentTopics is the maximum number of allowed contenttopics in a query
|
// MaxContentTopics is the maximum number of allowed contenttopics in a query
|
||||||
|
@ -10,7 +11,6 @@ const MaxContentTopics = 10
|
||||||
var (
|
var (
|
||||||
errMissingRequestID = errors.New("missing RequestId field")
|
errMissingRequestID = errors.New("missing RequestId field")
|
||||||
errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic")
|
errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic")
|
||||||
errRequestIDMismatch = errors.New("requestID in response does not match request")
|
|
||||||
errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed")
|
errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed")
|
||||||
errEmptyContentTopic = errors.New("one or more content topics specified is empty")
|
errEmptyContentTopic = errors.New("one or more content topics specified is empty")
|
||||||
errMissingPubsubTopic = errors.New("missing PubsubTopic field")
|
errMissingPubsubTopic = errors.New("missing PubsubTopic field")
|
||||||
|
@ -57,8 +57,8 @@ func (x *StoreQueryRequest) Validate() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *StoreQueryResponse) Validate(requestID string) error {
|
func (x *StoreQueryResponse) Validate(requestID string) error {
|
||||||
if x.RequestId != "" && x.RequestId != requestID {
|
if x.RequestId != "" && x.RequestId != "N/A" && x.RequestId != requestID {
|
||||||
return errRequestIDMismatch
|
return fmt.Errorf("requestID %s in response does not match requestID in request %s", x.RequestId, requestID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if x.StatusCode == nil {
|
if x.StatusCode == nil {
|
||||||
|
|
|
@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse {
|
||||||
return r.storeResponse
|
return r.storeResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Result) Next(ctx context.Context) error {
|
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
|
||||||
if r.cursor == nil {
|
if r.cursor == nil {
|
||||||
r.done = true
|
r.done = true
|
||||||
r.messages = nil
|
r.messages = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
newResult, err := r.store.next(ctx, r)
|
newResult, err := r.store.next(ctx, r, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
dbi "github.com/waku-org/go-libp2p-rendezvous/db"
|
dbi "github.com/waku-org/go-libp2p-rendezvous/db"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -315,6 +316,7 @@ func (db *DB) ValidCookie(ns string, cookie []byte) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) background(ctx context.Context) {
|
func (db *DB) background(ctx context.Context) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
for {
|
for {
|
||||||
db.cleanupExpired()
|
db.cleanupExpired()
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/service"
|
"github.com/waku-org/go-waku/waku/v2/service"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -142,6 +143,7 @@ func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*Rendezvou
|
||||||
|
|
||||||
// RegisterShard registers the node in the rendezvous points using a shard as namespace
|
// RegisterShard registers the node in the rendezvous points using a shard as namespace
|
||||||
func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) {
|
func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
namespace := ShardToNamespace(cluster, shard)
|
namespace := ShardToNamespace(cluster, shard)
|
||||||
r.RegisterWithNamespace(ctx, namespace, rendezvousPoints)
|
r.RegisterWithNamespace(ctx, namespace, rendezvousPoints)
|
||||||
}
|
}
|
||||||
|
@ -158,6 +160,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string
|
||||||
for _, m := range rendezvousPoints {
|
for _, m := range rendezvousPoints {
|
||||||
r.WaitGroup().Add(1)
|
r.WaitGroup().Add(1)
|
||||||
go func(m *RendezvousPoint) {
|
go func(m *RendezvousPoint) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
r.WaitGroup().Done()
|
r.WaitGroup().Done()
|
||||||
|
|
||||||
rendezvousClient := rvs.NewRendezvousClient(r.host, m.id)
|
rendezvousClient := rvs.NewRendezvousClient(r.host, m.id)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerData contains information about a peer useful in establishing connections with it.
|
// PeerData contains information about a peer useful in establishing connections with it.
|
||||||
|
@ -58,6 +59,7 @@ func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData {
|
||||||
return sp.channel
|
return sp.channel
|
||||||
}
|
}
|
||||||
func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool {
|
func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
if err := sp.ErrOnNotRunning(); err != nil {
|
if err := sp.ErrOnNotRunning(); err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/beevik/ntp"
|
"github.com/beevik/ntp"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,6 +70,7 @@ func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (t
|
||||||
responses := make(chan queryResponse, len(servers))
|
responses := make(chan queryResponse, len(servers))
|
||||||
for _, server := range servers {
|
for _, server := range servers {
|
||||||
go func(server string) {
|
go func(server string) {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
response, err := timeQuery(server, ntp.QueryOptions{
|
response, err := timeQuery(server, ntp.QueryOptions{
|
||||||
Timeout: DefaultRPCTimeout,
|
Timeout: DefaultRPCTimeout,
|
||||||
})
|
})
|
||||||
|
@ -172,6 +174,7 @@ func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) er
|
||||||
// we try to do it synchronously so that user can have reliable messages right away
|
// we try to do it synchronously so that user can have reliable messages right away
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer utils.LogOnPanic()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(period):
|
case <-time.After(period):
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
@ -81,3 +82,10 @@ func InitLogger(encoding string, output string, name string, level zapcore.Level
|
||||||
|
|
||||||
log = logging.Logger(name).Desugar()
|
log = logging.Logger(name).Desugar()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func LogOnPanic() {
|
||||||
|
if err := recover(); err != nil {
|
||||||
|
Logger().Error("panic in goroutine", zap.Any("error", err), zap.String("stacktrace", string(debug.Stack())))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,11 @@ import (
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type DialError struct {
|
||||||
|
Err error
|
||||||
|
PeerID peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
// GetPeerID is used to extract the peerID from a multiaddress
|
// GetPeerID is used to extract the peerID from a multiaddress
|
||||||
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
||||||
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
||||||
|
|
|
@ -1007,7 +1007,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
||||||
github.com/waku-org/go-libp2p-rendezvous
|
github.com/waku-org/go-libp2p-rendezvous
|
||||||
github.com/waku-org/go-libp2p-rendezvous/db
|
github.com/waku-org/go-libp2p-rendezvous/db
|
||||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||||
# github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
|
# github.com/waku-org/go-waku v0.8.1-0.20240925210455-69ce0c676ce7
|
||||||
## explicit; go 1.21
|
## explicit; go 1.21
|
||||||
github.com/waku-org/go-waku/logging
|
github.com/waku-org/go-waku/logging
|
||||||
github.com/waku-org/go-waku/tests
|
github.com/waku-org/go-waku/tests
|
||||||
|
|
Loading…
Reference in New Issue