From 66d8d3763d70de3bb71ae1379a0282ef6947ebb1 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Wed, 4 Jun 2025 14:19:14 +0100 Subject: [PATCH] fix: misc sync fixes, added debug logging (#3411) --- waku/waku_store_sync/reconciliation.nim | 20 +++++++++++++++----- waku/waku_store_sync/transfer.nim | 13 ++++++++++++- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index cb5c1bc3d..01000935b 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -65,6 +65,8 @@ type SyncReconciliation* = ref object of LPProtocol proc messageIngress*( self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage ) = + trace "message ingress", pubsub_topic = pubsubTopic, msg = msg + if msg.ephemeral: return @@ -78,6 +80,8 @@ proc messageIngress*( proc messageIngress*( self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage ) = + trace "message ingress", msg_hash = msgHash.toHex(), msg = msg + if msg.ephemeral: return @@ -87,6 +91,8 @@ proc messageIngress*( error "failed to insert new message", msg_hash = msgHash.toHex(), err = error proc messageIngress*(self: SyncReconciliation, id: SyncID) = + trace "message ingress", id = id + self.storage.insert(id).isOkOr: error "failed to insert new message", msg_hash = id.hash.toHex(), err = error @@ -116,7 +122,7 @@ proc processRequest( roundTrips.inc() - trace "sync payload received", + debug "sync payload received", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = recvPayload @@ -135,6 +141,9 @@ proc processRequest( recvPayload.shards.toPackedSet() == self.shards: sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) + debug "sync payload processed", + hash_to_send = hashToSend, hash_to_recv = hashToRecv + sendPayload.cluster = self.cluster sendPayload.shards = self.shards.toSeq() @@ -157,7 +166,7 @@ proc processRequest( return err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg) - trace "sync payload sent", + debug "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, payload = sendPayload @@ -208,7 +217,7 @@ proc initiate( "remote " & $connection.peerId & " connection write error: " & writeRes.error.msg ) - trace "sync payload sent", + debug "sync payload sent", local = self.peerManager.switch.peerInfo.peerId, remote = connection.peerId, payload = initPayload @@ -265,7 +274,7 @@ proc initFillStorage( debug "initial storage filling started" - var ids = newSeq[SyncID](DefaultStorageCap) + var ids = newSeqOfCap[SyncID](DefaultStorageCap) # we assume IDs are in order @@ -332,7 +341,8 @@ proc new*( sync.handler = handler sync.codec = WakuReconciliationCodec - info "Store Reconciliation protocol initialized" + info "Store Reconciliation protocol initialized", + sync_range = syncRange, sync_interval = syncInterval, relay_jitter = relayJitter return ok(sync) diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 78b83c601..f17fe944b 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -97,7 +97,13 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} = while true: # infinite loop let (peerId, fingerprint) = await self.remoteNeedsRx.popFirst() - if not self.outSessions.hasKey(peerId): + if (not self.outSessions.hasKey(peerId)) or self.outSessions[peerId].closed() or + ## sanity check, should not be possible + self.outSessions[peerId].isClosedRemotely: + ## quite possibly remote end has closed the connection, believing transfer to be done + debug "opening transfer connection to remote peer", + my_peer_id = self.peerManager.switch.peerInfo.peerId, remote_peer_id = peerId + let connection = (await self.openConnection(peerId)).valueOr: error "failed to establish transfer connection", error = error continue @@ -121,6 +127,11 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} = let msg = WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0]) + trace "sending transfer message", + my_peer_id = self.peerManager.switch.peerInfo.peerId, + remote_peer_id = peerId, + msg = msg + (await sendMessage(connection, msg)).isOkOr: self.outSessions.del(peerId) await connection.close()