mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-10 18:03:11 +00:00
Merge 98493ff605ad9f0254c2696ffca02d56c652c464 into 96196ab8bc05f31b09dac2403f9d5de3bc05f31b
This commit is contained in:
commit
db0f42642a
@ -34,8 +34,8 @@ proc defaultTestWakuConfBuilder*(): WakuConfBuilder =
|
||||
@[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")]
|
||||
)
|
||||
builder.withNatStrategy("any")
|
||||
builder.withMaxConnections(50)
|
||||
builder.withRelayServiceRatio("60:40")
|
||||
builder.withMaxConnections(150)
|
||||
builder.withRelayServiceRatio("50:50")
|
||||
builder.withMaxMessageSize("1024 KiB")
|
||||
builder.withClusterId(DefaultClusterId)
|
||||
builder.withSubscribeShards(@[DefaultShardId])
|
||||
|
||||
@ -206,22 +206,17 @@ type WakuNodeConf* = object
|
||||
.}: bool
|
||||
|
||||
maxConnections* {.
|
||||
desc: "Maximum allowed number of libp2p connections.",
|
||||
defaultValue: 50,
|
||||
desc:
|
||||
"Maximum allowed number of libp2p connections. (Default: 150) that's recommended value for better connectivity",
|
||||
defaultValue: 150,
|
||||
name: "max-connections"
|
||||
.}: int
|
||||
|
||||
maxRelayPeers* {.
|
||||
desc:
|
||||
"Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.",
|
||||
name: "max-relay-peers"
|
||||
.}: Option[int]
|
||||
|
||||
relayServiceRatio* {.
|
||||
desc:
|
||||
"This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
|
||||
name: "relay-service-ratio",
|
||||
defaultValue: "60:40" # 60:40 ratio of relay to service peers
|
||||
defaultValue: "50:50",
|
||||
name: "relay-service-ratio"
|
||||
.}: string
|
||||
|
||||
colocationLimit* {.
|
||||
@ -957,9 +952,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
b.withExtMultiAddrsOnly(n.extMultiAddrsOnly)
|
||||
b.withMaxConnections(n.maxConnections)
|
||||
|
||||
if n.maxRelayPeers.isSome():
|
||||
b.withMaxRelayPeers(n.maxRelayPeers.get())
|
||||
|
||||
if n.relayServiceRatio != "":
|
||||
b.withRelayServiceRatio(n.relayServiceRatio)
|
||||
b.withColocationLimit(n.colocationLimit)
|
||||
|
||||
@ -248,9 +248,6 @@ proc withAgentString*(b: var WakuConfBuilder, agentString: string) =
|
||||
proc withColocationLimit*(b: var WakuConfBuilder, colocationLimit: int) =
|
||||
b.colocationLimit = some(colocationLimit)
|
||||
|
||||
proc withMaxRelayPeers*(b: var WakuConfBuilder, maxRelayPeers: int) =
|
||||
b.maxRelayPeers = some(maxRelayPeers)
|
||||
|
||||
proc withRelayServiceRatio*(b: var WakuConfBuilder, relayServiceRatio: string) =
|
||||
b.relayServiceRatio = some(relayServiceRatio)
|
||||
|
||||
@ -588,12 +585,10 @@ proc build*(
|
||||
warn "Peer persistence not specified, defaulting to false"
|
||||
false
|
||||
|
||||
let maxConnections =
|
||||
if builder.maxConnections.isSome():
|
||||
builder.maxConnections.get()
|
||||
else:
|
||||
warn "Max Connections was not specified, defaulting to 300"
|
||||
300
|
||||
let maxConnections = builder.maxConnections.get()
|
||||
if maxConnections < 150:
|
||||
warn "max-connections less than 150; we suggest using 150 or more for better connectivity",
|
||||
provided = maxConnections
|
||||
|
||||
# TODO: Do the git version thing here
|
||||
let agentString = builder.agentString.get("nwaku")
|
||||
@ -663,7 +658,7 @@ proc build*(
|
||||
agentString: agentString,
|
||||
colocationLimit: colocationLimit,
|
||||
maxRelayPeers: builder.maxRelayPeers,
|
||||
relayServiceRatio: builder.relayServiceRatio.get("60:40"),
|
||||
relayServiceRatio: builder.relayServiceRatio.get("50:50"),
|
||||
rateLimit: rateLimit,
|
||||
circuitRelayClient: builder.circuitRelayClient.get(false),
|
||||
staticNodes: builder.staticNodes,
|
||||
|
||||
@ -1041,7 +1041,7 @@ proc new*(
|
||||
wakuMetadata: WakuMetadata = nil,
|
||||
maxRelayPeers: Option[int] = none(int),
|
||||
maxServicePeers: Option[int] = none(int),
|
||||
relayServiceRatio: string = "60:40",
|
||||
relayServiceRatio: string = "50:50",
|
||||
storage: PeerStorage = nil,
|
||||
initialBackoffInSec = InitialBackoffInSec,
|
||||
backoffFactor = BackoffFactor,
|
||||
|
||||
@ -61,9 +61,19 @@ proc validate*(msg: WakuMessage): Result[void, string] =
|
||||
upperBound = now + MaxMessageTimestampVariance
|
||||
|
||||
if msg.timestamp < lowerBound:
|
||||
warn "rejecting message with old timestamp",
|
||||
msgTimestamp = msg.timestamp,
|
||||
lowerBound = lowerBound,
|
||||
now = now,
|
||||
drift = (now - msg.timestamp) div 1_000_000_000
|
||||
return err(invalidMessageOld)
|
||||
|
||||
if upperBound < msg.timestamp:
|
||||
warn "rejecting message with future timestamp",
|
||||
msgTimestamp = msg.timestamp,
|
||||
upperBound = upperBound,
|
||||
now = now,
|
||||
drift = (msg.timestamp - now) div 1_000_000_000
|
||||
return err(invalidMessageFuture)
|
||||
|
||||
return ok()
|
||||
|
||||
@ -1,6 +1,12 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, tables], results, chronicles, chronos, metrics, bearssl/rand
|
||||
import
|
||||
std/[options, tables, sequtils, algorithm],
|
||||
results,
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
bearssl/rand
|
||||
import
|
||||
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec
|
||||
|
||||
@ -10,6 +16,8 @@ logScope:
|
||||
const DefaultPageSize*: uint = 20
|
||||
# A recommended default number of waku messages per page
|
||||
|
||||
const MaxQueryRetries = 5 # Maximum number of store peers to try before giving up
|
||||
|
||||
type WakuStoreClient* = ref object
|
||||
peerManager: PeerManager
|
||||
rng: ref rand.HmacDrbgContext
|
||||
@ -79,18 +87,33 @@ proc query*(
|
||||
proc queryToAny*(
|
||||
self: WakuStoreClient, request: StoreQueryRequest, peerId = none(PeerId)
|
||||
): Future[StoreQueryResult] {.async.} =
|
||||
## This proc is similar to the query one but in this case
|
||||
## we don't specify a particular peer and instead we get it from peer manager
|
||||
## we don't specify a particular peer and instead we get it from peer manager.
|
||||
## It will retry with different store peers if the dial fails.
|
||||
|
||||
if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
|
||||
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))
|
||||
|
||||
let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr:
|
||||
# Get all available store peers
|
||||
var peers = self.peerManager.switch.peerStore.getPeersByProtocol(WakuStoreCodec)
|
||||
if peers.len == 0:
|
||||
return err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected"))
|
||||
|
||||
let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
|
||||
waku_store_errors.inc(labelValues = [DialFailure])
|
||||
# Shuffle to distribute load and limit retries
|
||||
let peersToTry = peers[0 ..< min(peers.len, MaxQueryRetries)]
|
||||
|
||||
return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer))
|
||||
var lastError: StoreError
|
||||
for peer in peersToTry:
|
||||
let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr:
|
||||
waku_store_errors.inc(labelValues = [DialFailure])
|
||||
warn "failed to dial store peer, trying next"
|
||||
lastError = StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer)
|
||||
continue
|
||||
|
||||
return await self.sendStoreRequest(request, connection)
|
||||
let response = (await self.sendStoreRequest(request, connection)).valueOr:
|
||||
warn "store query failed, trying next peer", peerId = peer.peerId, error = $error
|
||||
lastError = error
|
||||
continue
|
||||
|
||||
return ok(response)
|
||||
|
||||
return err(lastError)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user