mirror of https://github.com/waku-org/nwaku.git
fix: lite-protocol-tester receiver exit check (#3187)
* Fix receiver exit criteria, not let it wait forever in some cases, added a timely check from the last arrived message * Extend dial and service usage failure metrics with agent string to reveal service nodes origins * Adjusted infra testing content topic to be unique in the system * Extend error logs with peer's agent string, fix exit criteria * Add informative log for not waiting for more messages * Add unknown as default for empty agent identifier * better explain exit logic of receiver * Address review comment - checking for last message arrival return Optional Moment instead of result - better explains what is happening.
This commit is contained in:
parent
1b532e8ab9
commit
beb21c78f4
|
@ -44,8 +44,9 @@ proc allPeers(pm: PeerManager): string =
|
|||
var allStr: string = ""
|
||||
for idx, peer in pm.wakuPeerStore.peers():
|
||||
allStr.add(
|
||||
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " &
|
||||
$peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n"
|
||||
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | agent: " &
|
||||
peer.getAgent() & " | protos: " & $peer.protocols & " | caps: " &
|
||||
$peer.enr.map(getCapabilities) & "\n"
|
||||
)
|
||||
return allStr
|
||||
|
||||
|
|
|
@ -73,7 +73,9 @@ proc maintainSubscription(
|
|||
|
||||
if subscribeRes.isErr():
|
||||
noFailedSubscribes += 1
|
||||
lpt_service_peer_failure_count.inc(labelValues = ["receiver"])
|
||||
lpt_service_peer_failure_count.inc(
|
||||
labelValues = ["receiver", actualFilterPeer.getAgent()]
|
||||
)
|
||||
error "Subscribe request failed.",
|
||||
err = subscribeRes.error,
|
||||
peer = actualFilterPeer,
|
||||
|
@ -150,11 +152,17 @@ proc setupAndSubscribe*(
|
|||
let interval = millis(20000)
|
||||
var printStats: CallbackFunc
|
||||
|
||||
# calculate max wait after the last known message arrived before exiting
|
||||
# 20% of expected messages times the expected interval but capped to 10min
|
||||
let maxWaitForLastMessage: Duration =
|
||||
min(conf.messageInterval.milliseconds * (conf.numMessages div 5), 10.minutes)
|
||||
|
||||
printStats = CallbackFunc(
|
||||
proc(udata: pointer) {.gcsafe.} =
|
||||
stats.echoStats()
|
||||
|
||||
if conf.numMessages > 0 and waitFor stats.checkIfAllMessagesReceived():
|
||||
if conf.numMessages > 0 and
|
||||
waitFor stats.checkIfAllMessagesReceived(maxWaitForLastMessage):
|
||||
waitFor unsubscribe(wakuNode, conf.pubsubTopics[0], conf.contentTopics[0])
|
||||
info "All messages received. Exiting."
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ MESSAGE_INTERVAL_MILLIS=1000
|
|||
MIN_MESSAGE_SIZE=15Kb
|
||||
MAX_MESSAGE_SIZE=145Kb
|
||||
PUBSUB=/waku/2/rs/16/32
|
||||
CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet
|
||||
CONTENT_TOPIC=/tester/2/light-pubsub-test-at-infra/status-prod
|
||||
CLUSTER_ID=16
|
||||
LIGHTPUSH_BOOTSTRAP=enr:-QEKuED9AJm2HGgrRpVaJY2nj68ao_QiPeUT43sK-aRM7sMJ6R4G11OSDOwnvVacgN1sTw-K7soC5dzHDFZgZkHU0u-XAYJpZIJ2NIJpcISnYxMvim11bHRpYWRkcnO4WgAqNiVib290LTAxLmRvLWFtczMuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfACw2JWJvb3QtMDEuZG8tYW1zMy5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaEC3rRtFQSgc24uWewzXaxTY8hDAHB8sgnxr9k8Rjb5GeSDdGNwgnZfg3VkcIIjKIV3YWt1Mg0
|
||||
FILTER_BOOTSTRAP=enr:-QEcuED7ww5vo2rKc1pyBp7fubBUH-8STHEZHo7InjVjLblEVyDGkjdTI9VdqmYQOn95vuQH-Htku17WSTzEufx-Wg4mAYJpZIJ2NIJpcIQihw1Xim11bHRpYWRkcnO4bAAzNi5ib290LTAxLmdjLXVzLWNlbnRyYWwxLWEuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfADU2LmJvb3QtMDEuZ2MtdXMtY2VudHJhbDEtYS5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaECxjqgDQ0WyRSOilYU32DA5k_XNlDis3m1VdXkK9xM6kODdGNwgnZfg3VkcIIjKIV3YWt1Mg0
|
||||
|
|
|
@ -177,7 +177,9 @@ proc publishMessages(
|
|||
continue
|
||||
else:
|
||||
noFailedPush += 1
|
||||
lpt_service_peer_failure_count.inc(labelValues = ["publisher"])
|
||||
lpt_service_peer_failure_count.inc(
|
||||
labelValues = ["publisher", actualServicePeer.getAgent()]
|
||||
)
|
||||
if not preventPeerSwitch and noFailedPush > maxFailedPush:
|
||||
info "Max push failure limit reached, Try switching peer."
|
||||
let peerOpt = selectRandomServicePeer(
|
||||
|
|
|
@ -36,7 +36,7 @@ declarePublicCounter lpt_publisher_failed_messages_count,
|
|||
declarePublicCounter lpt_publisher_sent_bytes, "number of total bytes sent"
|
||||
|
||||
declarePublicCounter lpt_service_peer_failure_count,
|
||||
"number of failure during using service peer [publisher/receiever]", ["role"]
|
||||
"number of failure during using service peer [publisher/receiever]", ["role", "agent"]
|
||||
|
||||
declarePublicCounter lpt_change_service_peer_count,
|
||||
"number of times [publisher/receiver] had to change service peer", ["role"]
|
||||
|
@ -44,6 +44,6 @@ declarePublicCounter lpt_change_service_peer_count,
|
|||
declarePublicGauge lpt_px_peers,
|
||||
"Number of peers PeerExchange discovered and can be dialed"
|
||||
|
||||
declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed"
|
||||
declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed", ["agent"]
|
||||
|
||||
declarePublicGauge lpt_dial_failures, "Number of dial failures by cause"
|
||||
declarePublicGauge lpt_dial_failures, "Number of dial failures by cause", ["agent"]
|
||||
|
|
|
@ -126,21 +126,29 @@ proc tryCallAllPxPeers*(
|
|||
if connOpt.value().isSome():
|
||||
okPeers.add(randomPeer)
|
||||
info "Dialing successful",
|
||||
peer = constructMultiaddrStr(randomPeer), codec = codec
|
||||
lpt_dialed_peers.inc()
|
||||
peer = constructMultiaddrStr(randomPeer),
|
||||
agent = randomPeer.getAgent(),
|
||||
codec = codec
|
||||
lpt_dialed_peers.inc(labelValues = [randomPeer.getAgent()])
|
||||
else:
|
||||
lpt_dial_failures.inc()
|
||||
error "Dialing failed", peer = constructMultiaddrStr(randomPeer), codec = codec
|
||||
lpt_dial_failures.inc(labelValues = [randomPeer.getAgent()])
|
||||
error "Dialing failed",
|
||||
peer = constructMultiaddrStr(randomPeer),
|
||||
agent = randomPeer.getAgent(),
|
||||
codec = codec
|
||||
else:
|
||||
lpt_dial_failures.inc()
|
||||
lpt_dial_failures.inc(labelValues = [randomPeer.getAgent()])
|
||||
error "Timeout dialing service peer",
|
||||
peer = constructMultiaddrStr(randomPeer), codec = codec
|
||||
peer = constructMultiaddrStr(randomPeer),
|
||||
agent = randomPeer.getAgent(),
|
||||
codec = codec
|
||||
|
||||
var okPeersStr: string = ""
|
||||
for idx, peer in okPeers:
|
||||
okPeersStr.add(
|
||||
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " &
|
||||
$peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n"
|
||||
" " & $idx & ". | " & constructMultiaddrStr(peer) & " | agent: " &
|
||||
peer.getAgent() & " | protos: " & $peer.protocols & " | caps: " &
|
||||
$peer.enr.map(getCapabilities) & "\n"
|
||||
)
|
||||
echo "PX returned peers found callable for " & codec & " / " & $capability & ":\n"
|
||||
echo okPeersStr
|
||||
|
|
|
@ -126,6 +126,11 @@ proc addMessage*(
|
|||
|
||||
lpt_receiver_sender_peer_count.set(value = self.len)
|
||||
|
||||
proc lastMessageArrivedAt*(self: Statistics): Option[Moment] =
|
||||
if self.receivedMessages > 0:
|
||||
return some(self.helper.prevArrivedAt)
|
||||
return none(Moment)
|
||||
|
||||
proc lossCount*(self: Statistics): uint32 =
|
||||
self.helper.maxIndex - self.receivedMessages
|
||||
|
||||
|
@ -274,16 +279,49 @@ proc jsonStats*(self: PerPeerStatistics): string =
|
|||
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
|
||||
"\"}"
|
||||
|
||||
proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async.} =
|
||||
proc lastMessageArrivedAt*(self: PerPeerStatistics): Option[Moment] =
|
||||
var lastArrivedAt = Moment.init(0, Millisecond)
|
||||
for stat in self.values:
|
||||
let lastMsgFromPeerAt = stat.lastMessageArrivedAt().valueOr:
|
||||
continue
|
||||
|
||||
if lastMsgFromPeerAt > lastArrivedAt:
|
||||
lastArrivedAt = lastMsgFromPeerAt
|
||||
|
||||
if lastArrivedAt == Moment.init(0, Millisecond):
|
||||
return none(Moment)
|
||||
|
||||
return some(lastArrivedAt)
|
||||
|
||||
proc checkIfAllMessagesReceived*(
|
||||
self: PerPeerStatistics, maxWaitForLastMessage: Duration
|
||||
): Future[bool] {.async.} =
|
||||
# if there are no peers have sent messages, assume we just have started.
|
||||
if self.len == 0:
|
||||
return false
|
||||
|
||||
# check if numerically all messages are received.
|
||||
# this suggest we received at least one message already from one peer
|
||||
var isAlllMessageReceived = true
|
||||
for stat in self.values:
|
||||
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
|
||||
stat.helper.maxIndex < stat.allMessageCount:
|
||||
isAlllMessageReceived = false
|
||||
break
|
||||
|
||||
if not isAlllMessageReceived:
|
||||
# if not all message received we still need to check if last message arrived within a time frame
|
||||
# to avoid endless waiting while publishers are already quit.
|
||||
let lastMessageAt = self.lastMessageArrivedAt()
|
||||
if lastMessageAt.isNone():
|
||||
return false
|
||||
|
||||
# last message shall arrived within time limit
|
||||
if Moment.now() - lastMessageAt.get() < maxWaitForLastMessage:
|
||||
return false
|
||||
else:
|
||||
info "No message since max wait time", maxWait = $maxWaitForLastMessage
|
||||
|
||||
## Ok, we see last message arrived from all peers,
|
||||
## lets check if all messages are received
|
||||
## and if not let's wait another 20 secs to give chance the system will send them.
|
||||
|
|
|
@ -358,3 +358,10 @@ func hasUdpPort*(peer: RemotePeerInfo): bool =
|
|||
|
||||
let typedEnr = typedEnrRes.get()
|
||||
typedEnr.udp.isSome() or typedEnr.udp6.isSome()
|
||||
|
||||
proc getAgent*(peer: RemotePeerInfo): string =
|
||||
## Returns the agent version of a peer
|
||||
if peer.agent.isEmptyOrWhitespace():
|
||||
return "unknown"
|
||||
|
||||
return peer.agent
|
||||
|
|
Loading…
Reference in New Issue