diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 53053585f..914c7366d 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -87,6 +87,12 @@ proc handleMessage*( ) {.async.} = self.validator(msg).isOkOr: waku_archive_errors.inc(labelValues = [error]) + trace "invalid message", + msg_hash = computeMessageHash(pubsubTopic, msg).to0xHex(), + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp, + error = error return let msgHash = computeMessageHash(pubsubTopic, msg) @@ -95,7 +101,7 @@ proc handleMessage*( (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) trace "failed to insert message", - hash_hash = msgHash.to0xHex(), + msg_hash = msgHash.to0xHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, @@ -103,7 +109,34 @@ proc handleMessage*( return trace "message archived", - hash_hash = msgHash.to0xHex(), + msg_hash = msgHash.to0xHex(), + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp + + let insertDuration = getTime().toUnixFloat() - insertStartTime + waku_archive_insert_duration_seconds.observe(insertDuration) + +proc syncMessageIngress*( + self: WakuArchive, + msgHash: WakuMessageHash, + pubsubTopic: PubsubTopic, + msg: WakuMessage, +) {.async.} = + let insertStartTime = getTime().toUnixFloat() + + (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: + waku_archive_errors.inc(labelValues = [insertFailure]) + trace "failed to insert message", + msg_hash = msgHash.to0xHex(), + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp, + error = error + return + + trace "message archived", + msg_hash = msgHash.to0xHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 0a5e6e49d..620ff1fa6 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -289,7 +289,9 @@ proc createTransferCallback( for kv in response.messages: let handleRes = catch: - await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get()) + await wakuArchive.syncMessageIngress( + kv.messageHash, kv.pubsubTopic.get(), kv.message.get() + ) if handleRes.isErr(): error "message transfer failed", error = handleRes.error.msg