mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-13 16:25:00 +00:00
fix: add log and archive message ingress for sync (#3133)
This commit is contained in:
parent
8106abb9dd
commit
80c7581ab1
@ -87,6 +87,12 @@ proc handleMessage*(
|
|||||||
) {.async.} =
|
) {.async.} =
|
||||||
self.validator(msg).isOkOr:
|
self.validator(msg).isOkOr:
|
||||||
waku_archive_errors.inc(labelValues = [error])
|
waku_archive_errors.inc(labelValues = [error])
|
||||||
|
trace "invalid message",
|
||||||
|
msg_hash = computeMessageHash(pubsubTopic, msg).to0xHex(),
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp,
|
||||||
|
error = error
|
||||||
return
|
return
|
||||||
|
|
||||||
let msgHash = computeMessageHash(pubsubTopic, msg)
|
let msgHash = computeMessageHash(pubsubTopic, msg)
|
||||||
@ -95,7 +101,7 @@ proc handleMessage*(
|
|||||||
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
|
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
|
||||||
waku_archive_errors.inc(labelValues = [insertFailure])
|
waku_archive_errors.inc(labelValues = [insertFailure])
|
||||||
trace "failed to insert message",
|
trace "failed to insert message",
|
||||||
hash_hash = msgHash.to0xHex(),
|
msg_hash = msgHash.to0xHex(),
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
timestamp = msg.timestamp,
|
timestamp = msg.timestamp,
|
||||||
@ -103,7 +109,34 @@ proc handleMessage*(
|
|||||||
return
|
return
|
||||||
|
|
||||||
trace "message archived",
|
trace "message archived",
|
||||||
hash_hash = msgHash.to0xHex(),
|
msg_hash = msgHash.to0xHex(),
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp
|
||||||
|
|
||||||
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||||
|
waku_archive_insert_duration_seconds.observe(insertDuration)
|
||||||
|
|
||||||
|
proc syncMessageIngress*(
|
||||||
|
self: WakuArchive,
|
||||||
|
msgHash: WakuMessageHash,
|
||||||
|
pubsubTopic: PubsubTopic,
|
||||||
|
msg: WakuMessage,
|
||||||
|
) {.async.} =
|
||||||
|
let insertStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
|
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
|
||||||
|
waku_archive_errors.inc(labelValues = [insertFailure])
|
||||||
|
trace "failed to insert message",
|
||||||
|
msg_hash = msgHash.to0xHex(),
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp,
|
||||||
|
error = error
|
||||||
|
return
|
||||||
|
|
||||||
|
trace "message archived",
|
||||||
|
msg_hash = msgHash.to0xHex(),
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
timestamp = msg.timestamp
|
timestamp = msg.timestamp
|
||||||
|
@ -289,7 +289,9 @@ proc createTransferCallback(
|
|||||||
|
|
||||||
for kv in response.messages:
|
for kv in response.messages:
|
||||||
let handleRes = catch:
|
let handleRes = catch:
|
||||||
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
|
await wakuArchive.syncMessageIngress(
|
||||||
|
kv.messageHash, kv.pubsubTopic.get(), kv.message.get()
|
||||||
|
)
|
||||||
|
|
||||||
if handleRes.isErr():
|
if handleRes.isErr():
|
||||||
error "message transfer failed", error = handleRes.error.msg
|
error "message transfer failed", error = handleRes.error.msg
|
||||||
|
Loading…
x
Reference in New Issue
Block a user