From f6c680a46a1f3e93a168008ff99d2f78dc74cd59 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Wed, 16 Apr 2025 09:24:05 -0400 Subject: [PATCH] feat: enhance Waku Sync logs and metrics (#3370) --- waku/waku_store_sync/protocols_metrics.nim | 4 ++++ waku/waku_store_sync/reconciliation.nim | 26 ++++++++++++++++------ waku/waku_store_sync/transfer.nim | 12 ++++++---- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/waku/waku_store_sync/protocols_metrics.nim b/waku/waku_store_sync/protocols_metrics.nim index 2d2776674..bb22f11c7 100644 --- a/waku/waku_store_sync/protocols_metrics.nim +++ b/waku/waku_store_sync/protocols_metrics.nim @@ -10,6 +10,10 @@ declarePublicHistogram reconciliation_roundtrips, "the nubmer of roundtrips for each reconciliation", buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf] +declarePublicHistogram reconciliation_differences, + "the nubmer of differences for each reconciliation", + buckets = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 10000.0, Inf] + declarePublicSummary total_bytes_exchanged, "the number of bytes sent and received by the protocols", ["protocol", "direction"] diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 80c025140..f7c13d42c 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -96,19 +96,23 @@ proc messageIngress*(self: SyncReconciliation, id: SyncID) = proc processRequest( self: SyncReconciliation, conn: Connection ): Future[Result[void, string]] {.async.} = - var roundTrips = 0 + var + roundTrips = 0 + diffs = 0 while true: let readRes = catch: await conn.readLp(int.high) let buffer: seq[byte] = readRes.valueOr: - return err("connection read error: " & error.msg) + await conn.close() + return err("remote " & $conn.peerId & " connection read error: " & error.msg) total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving]) let recvPayload = RangesData.deltaDecode(buffer).valueOr: - return err("payload decoding error: " & error) + await conn.close() + return err("remote " & $conn.peerId & " payload decoding error: " & error) roundTrips.inc() @@ -136,9 +140,11 @@ proc processRequest( for hash in hashToSend: self.remoteNeedsTx.addLastNoWait((conn.peerId, hash)) + diffs.inc() for hash in hashToRecv: self.localWantsTx.addLastNoWait((conn.peerId, hash)) + diffs.inc() rawPayload = sendPayload.deltaEncode() @@ -150,7 +156,9 @@ proc processRequest( await conn.writeLP(rawPayload) if writeRes.isErr(): - return err("connection write error: " & writeRes.error.msg) + await conn.close() + return + err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, @@ -163,6 +171,7 @@ proc processRequest( continue reconciliation_roundtrips.observe(roundTrips) + reconciliation_differences.observe(diffs) await conn.close() @@ -196,12 +205,15 @@ proc initiate( await connection.writeLP(sendPayload) if writeRes.isErr(): - return err("connection write error: " & writeRes.error.msg) + await connection.close() + return err( + "remote " & $connection.peerId & " connection write error: " & writeRes.error.msg + ) trace "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = connection.peerId, - payload = sendPayload + payload = initPayload ?await self.processRequest(connection) @@ -217,7 +229,7 @@ proc storeSynchronization*( let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec) let conn: Connection = connOpt.valueOr: - return err("cannot establish sync connection") + return err("fail to dial remote " & $peer.peerId) debug "sync session initialized", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 0ac959de0..81bed5ece 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -57,7 +57,8 @@ proc sendMessage( await conn.writeLP(rawPayload) if writeRes.isErr(): - return err("connection write error: " & writeRes.error.msg) + return + err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) total_transfer_messages_exchanged.inc(labelValues = [Sending]) @@ -69,7 +70,7 @@ proc openConnection( let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec) let conn: Connection = connOpt.valueOr: - return err("Cannot establish transfer connection") + return err("fail to dial remote " & $peerId) debug "transfer session initialized", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId @@ -126,6 +127,8 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} = WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0]) (await sendMessage(connection, msg)).isOkOr: + self.outSessions.del(peerId) + await connection.close() error "failed to send message", error = error continue @@ -158,17 +161,16 @@ proc initProtocolHandler(self: SyncTransfer) = if value[].missingOrExcl(hash): error "unwanted hash received, disconnecting" self.inSessions.del(conn.peerId) - await conn.close() break do: error "unwanted hash received, disconnecting" self.inSessions.del(conn.peerId) - await conn.close() break #TODO verify msg RLN proof... (await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr: + error "failed to archive message", error = $error continue let id = SyncID(time: msg.timestamp, hash: hash) @@ -176,6 +178,8 @@ proc initProtocolHandler(self: SyncTransfer) = continue + await conn.close() + debug "transfer session ended", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId