diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim index 9f255764f..788f83c68 100644 --- a/apps/liteprotocoltester/diagnose_connections.nim +++ b/apps/liteprotocoltester/diagnose_connections.nim @@ -44,8 +44,9 @@ proc allPeers(pm: PeerManager): string = var allStr: string = "" for idx, peer in pm.wakuPeerStore.peers(): allStr.add( - " " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " & - $peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n" + " " & $idx & ". | " & constructMultiaddrStr(peer) & " | agent: " & + peer.getAgent() & " | protos: " & $peer.protocols & " | caps: " & + $peer.enr.map(getCapabilities) & "\n" ) return allStr diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim index be9b1497e..143e0ca80 100644 --- a/apps/liteprotocoltester/filter_subscriber.nim +++ b/apps/liteprotocoltester/filter_subscriber.nim @@ -73,7 +73,9 @@ proc maintainSubscription( if subscribeRes.isErr(): noFailedSubscribes += 1 - lpt_service_peer_failure_count.inc(labelValues = ["receiver"]) + lpt_service_peer_failure_count.inc( + labelValues = ["receiver", actualFilterPeer.getAgent()] + ) error "Subscribe request failed.", err = subscribeRes.error, peer = actualFilterPeer, @@ -150,11 +152,17 @@ proc setupAndSubscribe*( let interval = millis(20000) var printStats: CallbackFunc + # calculate max wait after the last known message arrived before exiting + # 20% of expected messages times the expected interval but capped to 10min + let maxWaitForLastMessage: Duration = + min(conf.messageInterval.milliseconds * (conf.numMessages div 5), 10.minutes) + printStats = CallbackFunc( proc(udata: pointer) {.gcsafe.} = stats.echoStats() - if conf.numMessages > 0 and waitFor stats.checkIfAllMessagesReceived(): + if conf.numMessages > 0 and + waitFor stats.checkIfAllMessagesReceived(maxWaitForLastMessage): waitFor unsubscribe(wakuNode, conf.pubsubTopics[0], conf.contentTopics[0]) info "All messages received. Exiting." diff --git a/apps/liteprotocoltester/infra.env b/apps/liteprotocoltester/infra.env index 3ead4dc50..6d4542eca 100644 --- a/apps/liteprotocoltester/infra.env +++ b/apps/liteprotocoltester/infra.env @@ -5,7 +5,7 @@ MESSAGE_INTERVAL_MILLIS=1000 MIN_MESSAGE_SIZE=15Kb MAX_MESSAGE_SIZE=145Kb PUBSUB=/waku/2/rs/16/32 -CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet +CONTENT_TOPIC=/tester/2/light-pubsub-test-at-infra/status-prod CLUSTER_ID=16 LIGHTPUSH_BOOTSTRAP=enr:-QEKuED9AJm2HGgrRpVaJY2nj68ao_QiPeUT43sK-aRM7sMJ6R4G11OSDOwnvVacgN1sTw-K7soC5dzHDFZgZkHU0u-XAYJpZIJ2NIJpcISnYxMvim11bHRpYWRkcnO4WgAqNiVib290LTAxLmRvLWFtczMuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfACw2JWJvb3QtMDEuZG8tYW1zMy5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaEC3rRtFQSgc24uWewzXaxTY8hDAHB8sgnxr9k8Rjb5GeSDdGNwgnZfg3VkcIIjKIV3YWt1Mg0 FILTER_BOOTSTRAP=enr:-QEcuED7ww5vo2rKc1pyBp7fubBUH-8STHEZHo7InjVjLblEVyDGkjdTI9VdqmYQOn95vuQH-Htku17WSTzEufx-Wg4mAYJpZIJ2NIJpcIQihw1Xim11bHRpYWRkcnO4bAAzNi5ib290LTAxLmdjLXVzLWNlbnRyYWwxLWEuc3RhdHVzLnByb2Quc3RhdHVzLmltBnZfADU2LmJvb3QtMDEuZ2MtdXMtY2VudHJhbDEtYS5zdGF0dXMucHJvZC5zdGF0dXMuaW0GAbveA4Jyc40AEAUAAQAgAEAAgAEAiXNlY3AyNTZrMaECxjqgDQ0WyRSOilYU32DA5k_XNlDis3m1VdXkK9xM6kODdGNwgnZfg3VkcIIjKIV3YWt1Mg0 diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim index 4f32f7f03..2d48348b2 100644 --- a/apps/liteprotocoltester/lightpush_publisher.nim +++ b/apps/liteprotocoltester/lightpush_publisher.nim @@ -177,7 +177,9 @@ proc publishMessages( continue else: noFailedPush += 1 - lpt_service_peer_failure_count.inc(labelValues = ["publisher"]) + lpt_service_peer_failure_count.inc( + labelValues = ["publisher", actualServicePeer.getAgent()] + ) if not preventPeerSwitch and noFailedPush > maxFailedPush: info "Max push failure limit reached, Try switching peer." let peerOpt = selectRandomServicePeer( diff --git a/apps/liteprotocoltester/lpt_metrics.nim b/apps/liteprotocoltester/lpt_metrics.nim index 61666fb14..e68164d13 100644 --- a/apps/liteprotocoltester/lpt_metrics.nim +++ b/apps/liteprotocoltester/lpt_metrics.nim @@ -36,7 +36,7 @@ declarePublicCounter lpt_publisher_failed_messages_count, declarePublicCounter lpt_publisher_sent_bytes, "number of total bytes sent" declarePublicCounter lpt_service_peer_failure_count, - "number of failure during using service peer [publisher/receiever]", ["role"] + "number of failure during using service peer [publisher/receiever]", ["role", "agent"] declarePublicCounter lpt_change_service_peer_count, "number of times [publisher/receiver] had to change service peer", ["role"] @@ -44,6 +44,6 @@ declarePublicCounter lpt_change_service_peer_count, declarePublicGauge lpt_px_peers, "Number of peers PeerExchange discovered and can be dialed" -declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed" +declarePublicGauge lpt_dialed_peers, "Number of peers successfully dialed", ["agent"] -declarePublicGauge lpt_dial_failures, "Number of dial failures by cause" +declarePublicGauge lpt_dial_failures, "Number of dial failures by cause", ["agent"] diff --git a/apps/liteprotocoltester/service_peer_management.nim b/apps/liteprotocoltester/service_peer_management.nim index 6286b7913..8fd6de973 100644 --- a/apps/liteprotocoltester/service_peer_management.nim +++ b/apps/liteprotocoltester/service_peer_management.nim @@ -126,21 +126,29 @@ proc tryCallAllPxPeers*( if connOpt.value().isSome(): okPeers.add(randomPeer) info "Dialing successful", - peer = constructMultiaddrStr(randomPeer), codec = codec - lpt_dialed_peers.inc() + peer = constructMultiaddrStr(randomPeer), + agent = randomPeer.getAgent(), + codec = codec + lpt_dialed_peers.inc(labelValues = [randomPeer.getAgent()]) else: - lpt_dial_failures.inc() - error "Dialing failed", peer = constructMultiaddrStr(randomPeer), codec = codec + lpt_dial_failures.inc(labelValues = [randomPeer.getAgent()]) + error "Dialing failed", + peer = constructMultiaddrStr(randomPeer), + agent = randomPeer.getAgent(), + codec = codec else: - lpt_dial_failures.inc() + lpt_dial_failures.inc(labelValues = [randomPeer.getAgent()]) error "Timeout dialing service peer", - peer = constructMultiaddrStr(randomPeer), codec = codec + peer = constructMultiaddrStr(randomPeer), + agent = randomPeer.getAgent(), + codec = codec var okPeersStr: string = "" for idx, peer in okPeers: okPeersStr.add( - " " & $idx & ". | " & constructMultiaddrStr(peer) & " | protos: " & - $peer.protocols & " | caps: " & $peer.enr.map(getCapabilities) & "\n" + " " & $idx & ". | " & constructMultiaddrStr(peer) & " | agent: " & + peer.getAgent() & " | protos: " & $peer.protocols & " | caps: " & + $peer.enr.map(getCapabilities) & "\n" ) echo "PX returned peers found callable for " & codec & " / " & $capability & ":\n" echo okPeersStr diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim index be85d9bdb..8322edd8f 100644 --- a/apps/liteprotocoltester/statistics.nim +++ b/apps/liteprotocoltester/statistics.nim @@ -126,6 +126,11 @@ proc addMessage*( lpt_receiver_sender_peer_count.set(value = self.len) +proc lastMessageArrivedAt*(self: Statistics): Option[Moment] = + if self.receivedMessages > 0: + return some(self.helper.prevArrivedAt) + return none(Moment) + proc lossCount*(self: Statistics): uint32 = self.helper.maxIndex - self.receivedMessages @@ -274,16 +279,49 @@ proc jsonStats*(self: PerPeerStatistics): string = "{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() & "\"}" -proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async.} = +proc lastMessageArrivedAt*(self: PerPeerStatistics): Option[Moment] = + var lastArrivedAt = Moment.init(0, Millisecond) + for stat in self.values: + let lastMsgFromPeerAt = stat.lastMessageArrivedAt().valueOr: + continue + + if lastMsgFromPeerAt > lastArrivedAt: + lastArrivedAt = lastMsgFromPeerAt + + if lastArrivedAt == Moment.init(0, Millisecond): + return none(Moment) + + return some(lastArrivedAt) + +proc checkIfAllMessagesReceived*( + self: PerPeerStatistics, maxWaitForLastMessage: Duration +): Future[bool] {.async.} = # if there are no peers have sent messages, assume we just have started. if self.len == 0: return false + # check if numerically all messages are received. + # this suggest we received at least one message already from one peer + var isAlllMessageReceived = true for stat in self.values: if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or stat.helper.maxIndex < stat.allMessageCount: + isAlllMessageReceived = false + break + + if not isAlllMessageReceived: + # if not all message received we still need to check if last message arrived within a time frame + # to avoid endless waiting while publishers are already quit. + let lastMessageAt = self.lastMessageArrivedAt() + if lastMessageAt.isNone(): return false + # last message shall arrived within time limit + if Moment.now() - lastMessageAt.get() < maxWaitForLastMessage: + return false + else: + info "No message since max wait time", maxWait = $maxWaitForLastMessage + ## Ok, we see last message arrived from all peers, ## lets check if all messages are received ## and if not let's wait another 20 secs to give chance the system will send them. diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 77ebc9abe..a821a0474 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -358,3 +358,10 @@ func hasUdpPort*(peer: RemotePeerInfo): bool = let typedEnr = typedEnrRes.get() typedEnr.udp.isSome() or typedEnr.udp6.isSome() + +proc getAgent*(peer: RemotePeerInfo): string = + ## Returns the agent version of a peer + if peer.agent.isEmptyOrWhitespace(): + return "unknown" + + return peer.agent