chore_: bump go-waku

This commit is contained in:
Richard Ramos 2024-08-19 15:08:44 -04:00 committed by richΛrd
parent 258652b0dd
commit 61e20dd2e8
9 changed files with 97 additions and 41 deletions

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816
github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816 h1:OfmTCkidLxTenZQ5cQvohO0i2sKozxQ7Sm6AlgFGVwA=
github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d h1:YVTBJpd6vZZzu8X0515bK0D21fgGcrAlYZelgbIdBD4=
github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -157,3 +157,7 @@ func Uint64(key string, value uint64) zap.Field {
valueStr := fmt.Sprintf("%v", value)
return zap.String(key, valueStr)
}
func UTCTime(key string, t time.Time) zap.Field {
return zap.Time(key, t.UTC())
}

View File

@ -20,6 +20,7 @@ import (
)
const maxContentTopicsPerRequest = 10
const maxMsgHashesPerRequest = 50
// MessageTracker should keep track of messages it has seen before and
// provide a way to determine whether a message exists or not. This
@ -247,38 +248,55 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
return nil
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, missingHashes, store.WithPeer(interest.peerID), store.WithPaging(false, 100))
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return err
}
for !result.IsComplete() {
for _, mkv := range result.Messages() {
select {
case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
}
wg := sync.WaitGroup{}
// Split into batches
for i := 0; i < len(missingHashes); i += maxMsgHashesPerRequest {
j := i + maxMsgHashesPerRequest
if j > len(missingHashes) {
j = len(missingHashes)
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
wg.Add(1)
go func(messageHashes []pb.MessageHash) {
defer wg.Wait()
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
}
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
for !result.IsComplete() {
for _, mkv := range result.Messages() {
select {
case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
}
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
}
}
return err
}
}(missingHashes[i:j])
}
wg.Wait()
return nil
}

View File

@ -16,7 +16,7 @@ import (
"go.uber.org/zap"
)
const DefaultMaxHashQueryLength = 100
const DefaultMaxHashQueryLength = 50
const DefaultHashQueryInterval = 3 * time.Second
const DefaultMessageSentPeriod = 3 // in seconds
const DefaultMessageExpiredPerid = 10 // in seconds
@ -216,7 +216,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
}
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes))
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
result, err := m.store.QueryByHash(ctx, messageHashes, opts...)
if err != nil {
@ -248,8 +248,8 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
}
}
m.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes))
m.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes))
m.logger.Debug("ack message hashes", zap.Stringers("ackHashes", ackHashes))
m.logger.Debug("missed message hashes", zap.Stringers("missedHashes", missedHashes))
return append(ackHashes, missedHashes...)
}

View File

@ -207,11 +207,11 @@ func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool {
now := time.Now()
if now.Before(tv.nextTry) {
c.logger.Debug("Skipping connecting to peer due to backoff strategy",
zap.Time("currentTime", now), zap.Time("until", tv.nextTry))
logging.UTCTime("currentTime", now), logging.UTCTime("until", tv.nextTry))
return false
}
c.logger.Debug("Proceeding with connecting to peer",
zap.Time("currentTime", now), zap.Time("nextTry", tv.nextTry))
logging.UTCTime("currentTime", now), logging.UTCTime("nextTry", tv.nextTry))
}
return true
}
@ -228,7 +228,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) {
cachedPeer = &connCacheData{strat: c.backoff()}
cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay())
c.logger.Debug("Initializing connectionCache for peer ",
logging.HostID("peerID", peerID), zap.Time("until", cachedPeer.nextTry))
logging.HostID("peerID", peerID), logging.UTCTime("until", cachedPeer.nextTry))
c.cache.Add(peerID, cachedPeer)
}
}

View File

@ -7,6 +7,7 @@ import (
"math"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
@ -17,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap"
@ -155,8 +157,38 @@ func (wakuPX *WakuPeerExchange) Stop() {
})
}
func (wakuPX *WakuPeerExchange) DefaultPredicate() discv5.Predicate {
return discv5.FilterPredicate(func(n *enode.Node) bool {
localRS, err := wenr.RelaySharding(wakuPX.disc.Node().Record())
if err != nil {
return false
}
if localRS == nil { // No shard registered, so no need to check for shards
return true
}
nodeRS, err := wenr.RelaySharding(n.Record())
if err != nil {
wakuPX.log.Debug("failed to get relay shards from node record", logging.ENode("node", n), zap.Error(err))
return false
}
if nodeRS == nil {
// Node has no shards registered.
return false
}
if nodeRS.ClusterID != localRS.ClusterID {
return false
}
return true
})
}
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
iterator, err := wakuPX.disc.PeerIterator()
iterator, err := wakuPX.disc.PeerIterator(wakuPX.DefaultPredicate())
if err != nil {
return fmt.Errorf("obtaining iterator: %w", err)
}

View File

@ -6,6 +6,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var log *zap.Logger
@ -19,7 +20,7 @@ func Logger(name ...string) *zap.Logger {
}
if log == nil {
InitLogger("console", "stdout", loggerName)
InitLogger("console", "stdout", loggerName, zapcore.InfoLevel)
}
return log
}
@ -39,8 +40,9 @@ func MessagesLogger(prefix string) *zap.Logger {
}
// InitLogger initializes a global logger using an specific encoding
func InitLogger(encoding string, output string, name string) {
func InitLogger(encoding string, output string, name string, level zapcore.Level) {
cfg := logging.GetConfig()
cfg.Level = logging.LogLevel(level)
if encoding == "json" {
cfg.Format = logging.JSONOutput

2
vendor/modules.txt vendored
View File

@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816
# github.com/waku-org/go-waku v0.8.1-0.20240819221706-d3b51130599d
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests