fix: store-query issue in v0.37.0

This commit is contained in:
darshankabariya 2025-12-08 17:42:42 +05:30
parent 5a7f4a33bb
commit 41630f4fa3
No known key found for this signature in database
GPG Key ID: 9A92CCD9899F0D22
4 changed files with 49 additions and 14 deletions

View File

@ -34,8 +34,8 @@ proc defaultTestWakuConfBuilder*(): WakuConfBuilder =
@[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")]
)
builder.withNatStrategy("any")
builder.withMaxConnections(50)
builder.withRelayServiceRatio("60:40")
builder.withMaxConnections(200)
builder.withRelayServiceRatio("50:50")
builder.withMaxMessageSize("1024 KiB")
builder.withClusterId(DefaultClusterId)
builder.withSubscribeShards(@[DefaultShardId])

View File

@ -206,16 +206,17 @@ type WakuNodeConf* = object
.}: bool
maxConnections* {.
desc: "Maximum allowed number of libp2p connections. (Default: 200) can't set it to less than 200",
defaultValue: 200,
desc:
"Maximum allowed number of libp2p connections. (Default: 200) can't set it to less than 200",
defaultValue: 200,
name: "max-connections"
.}: int
relayServiceRatio* {.
desc:
"This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
name: "relay-service-ratio",
defaultValue: "50:50"
defaultValue: "50:50",
name: "relay-service-ratio"
.}: string
colocationLimit* {.

View File

@ -61,9 +61,19 @@ proc validate*(msg: WakuMessage): Result[void, string] =
upperBound = now + MaxMessageTimestampVariance
if msg.timestamp < lowerBound:
warn "rejecting message with old timestamp",
msgTimestamp = msg.timestamp,
lowerBound = lowerBound,
now = now,
drift = (now - msg.timestamp) div 1_000_000_000
return err(invalidMessageOld)
if upperBound < msg.timestamp:
warn "rejecting message with future timestamp",
msgTimestamp = msg.timestamp,
upperBound = upperBound,
now = now,
drift = (msg.timestamp - now) div 1_000_000_000
return err(invalidMessageFuture)
return ok()

View File

@ -1,6 +1,12 @@
{.push raises: [].}
import std/[options, tables], results, chronicles, chronos, metrics, bearssl/rand
import
std/[options, tables, sequtils, algorithm],
results,
chronicles,
chronos,
metrics,
bearssl/rand
import
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec
@ -10,6 +16,8 @@ logScope:
const DefaultPageSize*: uint = 20
# A recommended default number of waku messages per page
const MaxQueryRetries = 5 # Maximum number of store peers to try before giving up
type WakuStoreClient* = ref object
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
@ -79,18 +87,34 @@ proc query*(
proc queryToAny*(
self: WakuStoreClient, request: StoreQueryRequest, peerId = none(PeerId)
): Future[StoreQueryResult] {.async.} =
## This proc is similar to the query one but in this case
## we don't specify a particular peer and instead we get it from peer manager
## we don't specify a particular peer and instead we get it from peer manager.
## It will retry with different store peers if the dial fails.
if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))
let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr:
# Get all available store peers
var peers = self.peerManager.switch.peerStore.getPeersByProtocol(WakuStoreCodec)
if peers.len == 0:
return err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected"))
let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
waku_store_errors.inc(labelValues = [DialFailure])
# Shuffle to distribute load and limit retries
let peersToTry = peers[0 ..< min(peers.len, MaxQueryRetries)]
return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer))
var lastError: StoreError
for peer in peersToTry:
let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
waku_store_errors.inc(labelValues = [DialFailure])
warn "failed to dial store peer, trying next"
lastError = StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer)
continue
return await self.sendStoreRequest(request, connection)
let res = await self.sendStoreRequest(request, connection)
if res.isOk():
return res
warn "store query failed, trying next peer",
peerId = peer.peerId, cause = $res.error
lastError = res.error
return err(lastError)