fix: misc sync fixes, added debug logging (#3411)

This commit is contained in:
Hanno Cornelius 2025-06-04 14:19:14 +01:00 committed by GitHub
parent 336fbf8b64
commit 66d8d3763d
2 changed files with 27 additions and 6 deletions

View File

@ -65,6 +65,8 @@ type SyncReconciliation* = ref object of LPProtocol
proc messageIngress*(
self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage
) =
trace "message ingress", pubsub_topic = pubsubTopic, msg = msg
if msg.ephemeral:
return
@ -78,6 +80,8 @@ proc messageIngress*(
proc messageIngress*(
self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage
) =
trace "message ingress", msg_hash = msgHash.toHex(), msg = msg
if msg.ephemeral:
return
@ -87,6 +91,8 @@ proc messageIngress*(
error "failed to insert new message", msg_hash = msgHash.toHex(), err = error
proc messageIngress*(self: SyncReconciliation, id: SyncID) =
trace "message ingress", id = id
self.storage.insert(id).isOkOr:
error "failed to insert new message", msg_hash = id.hash.toHex(), err = error
@ -116,7 +122,7 @@ proc processRequest(
roundTrips.inc()
trace "sync payload received",
debug "sync payload received",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = recvPayload
@ -135,6 +141,9 @@ proc processRequest(
recvPayload.shards.toPackedSet() == self.shards:
sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv)
debug "sync payload processed",
hash_to_send = hashToSend, hash_to_recv = hashToRecv
sendPayload.cluster = self.cluster
sendPayload.shards = self.shards.toSeq()
@ -157,7 +166,7 @@ proc processRequest(
return
err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg)
trace "sync payload sent",
debug "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = conn.peerId,
payload = sendPayload
@ -208,7 +217,7 @@ proc initiate(
"remote " & $connection.peerId & " connection write error: " & writeRes.error.msg
)
trace "sync payload sent",
debug "sync payload sent",
local = self.peerManager.switch.peerInfo.peerId,
remote = connection.peerId,
payload = initPayload
@ -265,7 +274,7 @@ proc initFillStorage(
debug "initial storage filling started"
var ids = newSeq[SyncID](DefaultStorageCap)
var ids = newSeqOfCap[SyncID](DefaultStorageCap)
# we assume IDs are in order
@ -332,7 +341,8 @@ proc new*(
sync.handler = handler
sync.codec = WakuReconciliationCodec
info "Store Reconciliation protocol initialized"
info "Store Reconciliation protocol initialized",
sync_range = syncRange, sync_interval = syncInterval, relay_jitter = relayJitter
return ok(sync)

View File

@ -97,7 +97,13 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} =
while true: # infinite loop
let (peerId, fingerprint) = await self.remoteNeedsRx.popFirst()
if not self.outSessions.hasKey(peerId):
if (not self.outSessions.hasKey(peerId)) or self.outSessions[peerId].closed() or
## sanity check, should not be possible
self.outSessions[peerId].isClosedRemotely:
## quite possibly remote end has closed the connection, believing transfer to be done
debug "opening transfer connection to remote peer",
my_peer_id = self.peerManager.switch.peerInfo.peerId, remote_peer_id = peerId
let connection = (await self.openConnection(peerId)).valueOr:
error "failed to establish transfer connection", error = error
continue
@ -121,6 +127,11 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} =
let msg =
WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0])
trace "sending transfer message",
my_peer_id = self.peerManager.switch.peerInfo.peerId,
remote_peer_id = peerId,
msg = msg
(await sendMessage(connection, msg)).isOkOr:
self.outSessions.del(peerId)
await connection.close()