chore_: bump go-waku (#5660)
This commit is contained in:
parent
c04e0dab4a
commit
cea857ed00
2
go.mod
2
go.mod
|
@ -96,7 +96,7 @@ require (
|
|||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
|||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d h1:thwE3nxBaINnQllutuZdMdyLMUZLKwX7TRufs1IrlQc=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815 h1:arQLN7mhSJNBJLhnxiV9WqMEC5rYZZS7oHnuYBPEct8=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
|
|
@ -382,7 +382,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error {
|
|||
}
|
||||
|
||||
if len(rs) == 1 {
|
||||
w.log.Info("updating advertised relay shards in ENR")
|
||||
w.log.Info("updating advertised relay shards in ENR", zap.Any("newShardInfo", rs[0]))
|
||||
if len(rs[0].ShardIDs) != len(topics) {
|
||||
w.log.Warn("A mix of named and static shards found. ENR shard will contain only the following shards", zap.Any("shards", rs[0]))
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto
|
|||
for _, shardInfo := range shardsInfo {
|
||||
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount)
|
||||
if err != nil {
|
||||
pm.logger.Error("failed to discover and connect to peers", zap.Error(err))
|
||||
pm.logger.Warn("failed to discover and connect to peers", zap.Error(err))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -309,17 +309,15 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
|
|||
defer pm.topicMutex.RUnlock()
|
||||
for topicStr, topicInst := range pm.subRelayTopics {
|
||||
|
||||
// @cammellos reported that ListPeers returned an invalid number of
|
||||
// peers. This will ensure that the peers returned by this function
|
||||
// match those peers that are currently connected
|
||||
meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
|
||||
curConnectedPeerLen := pm.getPeersBasedOnconnectionStatus(topicStr, network.Connected).Len()
|
||||
|
||||
curPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
|
||||
if curPeerLen < pm.OutPeersTarget {
|
||||
if meshPeerLen < waku_proto.GossipSubDMin || curConnectedPeerLen < pm.OutPeersTarget {
|
||||
pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh",
|
||||
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
|
||||
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curConnectedPeerLen),
|
||||
zap.Int("targetPeers", pm.OutPeersTarget))
|
||||
//Find not connected peers.
|
||||
notConnectedPeers := pm.getNotConnectedPers(topicStr)
|
||||
notConnectedPeers := pm.getPeersBasedOnconnectionStatus(topicStr, network.NotConnected)
|
||||
if notConnectedPeers.Len() == 0 {
|
||||
pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr))
|
||||
go pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
|
||||
|
@ -327,12 +325,13 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
|
|||
}
|
||||
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
|
||||
//Connect to eligible peers.
|
||||
numPeersToConnect := pm.OutPeersTarget - curPeerLen
|
||||
|
||||
if numPeersToConnect > notConnectedPeers.Len() {
|
||||
numPeersToConnect = notConnectedPeers.Len()
|
||||
numPeersToConnect := pm.OutPeersTarget - curConnectedPeerLen
|
||||
if numPeersToConnect > 0 {
|
||||
if numPeersToConnect > notConnectedPeers.Len() {
|
||||
numPeersToConnect = notConnectedPeers.Len()
|
||||
}
|
||||
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
|
||||
}
|
||||
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -372,8 +371,8 @@ func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) {
|
|||
}
|
||||
}
|
||||
|
||||
// getNotConnectedPers returns peers for a pubSubTopic that are not connected.
|
||||
func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) {
|
||||
// getPeersBasedOnconnectionStatus returns peers for a pubSubTopic that are either connected/not-connected based on status passed.
|
||||
func (pm *PeerManager) getPeersBasedOnconnectionStatus(pubsubTopic string, connected network.Connectedness) (filteredPeers peer.IDSlice) {
|
||||
var peerList peer.IDSlice
|
||||
if pubsubTopic == "" {
|
||||
peerList = pm.host.Peerstore().Peers()
|
||||
|
@ -381,8 +380,8 @@ func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeer
|
|||
peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
|
||||
}
|
||||
for _, peerID := range peerList {
|
||||
if pm.host.Network().Connectedness(peerID) != network.Connected {
|
||||
notConnectedPeers = append(notConnectedPeers, peerID)
|
||||
if pm.host.Network().Connectedness(peerID) == connected {
|
||||
filteredPeers = append(filteredPeers, peerID)
|
||||
}
|
||||
}
|
||||
return
|
||||
|
|
|
@ -242,7 +242,7 @@ func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID,
|
|||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
||||
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peer peer.ID) error {
|
||||
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter, peerID peer.ID) error {
|
||||
request := &pb.FilterSubscribeRequest{
|
||||
RequestId: hex.EncodeToString(requestID),
|
||||
FilterSubscribeType: reqType,
|
||||
|
@ -255,11 +255,14 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
|||
return err
|
||||
}
|
||||
|
||||
logger := wf.log.With(logging.HostID("peerID", peer))
|
||||
logger := wf.log.With(logging.HostID("peerID", peerID))
|
||||
|
||||
stream, err := wf.h.NewStream(ctx, peer, FilterSubscribeID_v20beta1)
|
||||
stream, err := wf.h.NewStream(ctx, peerID, FilterSubscribeID_v20beta1)
|
||||
if err != nil {
|
||||
wf.metrics.RecordError(dialFailure)
|
||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
|
@ -273,6 +274,9 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge
|
|||
wf.metrics.RecordError(pushTimeoutFailure)
|
||||
} else {
|
||||
wf.metrics.RecordError(dialFailure)
|
||||
if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
|
||||
}
|
||||
}
|
||||
logger.Error("opening peer stream", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -207,6 +207,9 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor
|
|||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
store.metrics.RecordError(dialFailure)
|
||||
if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
9
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
9
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
|
@ -188,14 +188,17 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu
|
|||
}
|
||||
|
||||
// request sends a message via lightPush protocol to either a specified peer or peer that is selected.
|
||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) {
|
||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peerID peer.ID) (*pb.PushResponse, error) {
|
||||
|
||||
logger := wakuLP.log.With(logging.HostID("peer", peer))
|
||||
logger := wakuLP.log.With(logging.HostID("peer", peerID))
|
||||
|
||||
stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1)
|
||||
stream, err := wakuLP.h.NewStream(ctx, peerID, LightPushID_v20beta1)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
wakuLP.metrics.RecordError(dialFailure)
|
||||
if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
pushRequestRPC := &pb.PushRpc{RequestId: hex.EncodeToString(params.requestID), Request: req}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb"
|
||||
|
@ -103,6 +104,9 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak
|
|||
stream, err := wakuM.h.NewStream(ctx, peerID, MetadataID_v1)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peer.AddrInfo{ID: peerID})
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -335,7 +339,7 @@ func (wakuM *WakuMetadata) DisconnectPeerOnShardMismatch(ctx context.Context, pe
|
|||
return err
|
||||
}
|
||||
|
||||
if !rs.ContainsAnyShard(rs.ClusterID, peerShards) {
|
||||
if rs != nil && !rs.ContainsAnyShard(rs.ClusterID, peerShards) {
|
||||
wakuM.log.Info("shard mismatch", logging.HostID("peerID", peerID), zap.Uint16("clusterID", rs.ClusterID), zap.Uint16s("ourShardIDs", rs.ShardIDs), zap.Uint16s("theirShardIDs", peerShards))
|
||||
wakuM.disconnect(peerID)
|
||||
return errors.New("shard mismatch")
|
||||
|
|
|
@ -76,6 +76,9 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
|||
|
||||
stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1)
|
||||
if err != nil {
|
||||
if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
|
|||
|
||||
if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
|
||||
wakuPX.metrics.RecordError(rateLimitFailure)
|
||||
wakuPX.log.Error("exceeds the rate limit")
|
||||
wakuPX.log.Info("exceeds the rate limit")
|
||||
// TODO: peer exchange protocol should contain an err field
|
||||
if err := stream.Reset(); err != nil {
|
||||
wakuPX.log.Error("resetting connection", zap.Error(err))
|
||||
|
|
|
@ -253,6 +253,9 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
|
|||
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||
ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer})
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
|||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240716173432-8f3332d1a08d
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240806174951-6e1ff9818815
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
|
Loading…
Reference in New Issue