mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-23 04:18:30 +00:00
archive enhance logging (#3258)
This commit is contained in:
parent
998f040fdb
commit
b406b7b2ac
@ -85,65 +85,82 @@ proc new*(
|
|||||||
proc handleMessage*(
|
proc handleMessage*(
|
||||||
self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage
|
self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
|
let msgHash = computeMessageHash(pubsubTopic, msg)
|
||||||
|
let msgHashHex = msgHash.to0xHex()
|
||||||
|
|
||||||
|
trace "handling message",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
msgTimestamp = msg.timestamp
|
||||||
|
|
||||||
self.validator(msg).isOkOr:
|
self.validator(msg).isOkOr:
|
||||||
waku_archive_errors.inc(labelValues = [error])
|
waku_archive_errors.inc(labelValues = [error])
|
||||||
trace "invalid message",
|
trace "invalid message",
|
||||||
msg_hash = computeMessageHash(pubsubTopic, msg).to0xHex(),
|
msg_hash = msgHashHex,
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
timestamp = msg.timestamp,
|
timestamp = msg.timestamp,
|
||||||
error = error
|
error = error
|
||||||
return
|
return
|
||||||
|
|
||||||
let msgHash = computeMessageHash(pubsubTopic, msg)
|
|
||||||
let insertStartTime = getTime().toUnixFloat()
|
let insertStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
|
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
|
||||||
waku_archive_errors.inc(labelValues = [insertFailure])
|
waku_archive_errors.inc(labelValues = [insertFailure])
|
||||||
trace "failed to insert message",
|
trace "failed to insert message",
|
||||||
msg_hash = msgHash.to0xHex(),
|
msg_hash = msgHashHex,
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
timestamp = msg.timestamp,
|
timestamp = msg.timestamp,
|
||||||
error = error
|
error = error
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "message archived",
|
|
||||||
msg_hash = msgHash.to0xHex(),
|
|
||||||
pubsubTopic = pubsubTopic,
|
|
||||||
contentTopic = msg.contentTopic,
|
|
||||||
timestamp = msg.timestamp
|
|
||||||
|
|
||||||
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||||
waku_archive_insert_duration_seconds.observe(insertDuration)
|
waku_archive_insert_duration_seconds.observe(insertDuration)
|
||||||
|
|
||||||
|
trace "message archived",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp,
|
||||||
|
insertDuration = insertDuration
|
||||||
|
|
||||||
proc syncMessageIngress*(
|
proc syncMessageIngress*(
|
||||||
self: WakuArchive,
|
self: WakuArchive,
|
||||||
msgHash: WakuMessageHash,
|
msgHash: WakuMessageHash,
|
||||||
pubsubTopic: PubsubTopic,
|
pubsubTopic: PubsubTopic,
|
||||||
msg: WakuMessage,
|
msg: WakuMessage,
|
||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
let insertStartTime = getTime().toUnixFloat()
|
let msgHashHex = msgHash.to0xHex()
|
||||||
|
|
||||||
|
trace "handling message in syncMessageIngress",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp
|
||||||
|
|
||||||
|
let insertStartTime = getTime().toUnixFloat()
|
||||||
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
|
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
|
||||||
waku_archive_errors.inc(labelValues = [insertFailure])
|
waku_archive_errors.inc(labelValues = [insertFailure])
|
||||||
trace "failed to insert message",
|
trace "failed to insert message in in syncMessageIngress",
|
||||||
msg_hash = msgHash.toHex(),
|
msg_hash = msgHashHex,
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
timestamp = msg.timestamp,
|
timestamp = msg.timestamp,
|
||||||
error = $error
|
error = $error
|
||||||
return err(error)
|
return err(error)
|
||||||
|
|
||||||
trace "message archived",
|
|
||||||
msg_hash = msgHash.to0xHex(),
|
|
||||||
pubsubTopic = pubsubTopic,
|
|
||||||
contentTopic = msg.contentTopic,
|
|
||||||
timestamp = msg.timestamp
|
|
||||||
|
|
||||||
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||||
waku_archive_insert_duration_seconds.observe(insertDuration)
|
waku_archive_insert_duration_seconds.observe(insertDuration)
|
||||||
|
|
||||||
|
trace "message archived in syncMessageIngress",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp,
|
||||||
|
insertDuration = insertDuration
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc validateTimeRange(
|
proc validateTimeRange(
|
||||||
|
@ -79,10 +79,6 @@ proc new*(
|
|||||||
proc handleMessage*(
|
proc handleMessage*(
|
||||||
self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage
|
self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
self.validator(msg).isOkOr:
|
|
||||||
waku_legacy_archive_errors.inc(labelValues = [error])
|
|
||||||
return
|
|
||||||
|
|
||||||
let
|
let
|
||||||
msgDigest = computeDigest(msg)
|
msgDigest = computeDigest(msg)
|
||||||
msgDigestHex = msgDigest.data.to0xHex()
|
msgDigestHex = msgDigest.data.to0xHex()
|
||||||
@ -99,26 +95,40 @@ proc handleMessage*(
|
|||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
msgTimestamp = msg.timestamp,
|
msgTimestamp = msg.timestamp,
|
||||||
usedTimestamp = msgTimestamp,
|
|
||||||
digest = msgDigestHex
|
digest = msgDigestHex
|
||||||
|
|
||||||
|
self.validator(msg).isOkOr:
|
||||||
|
waku_legacy_archive_errors.inc(labelValues = [error])
|
||||||
|
trace "invalid message",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp,
|
||||||
|
error = error
|
||||||
|
return
|
||||||
|
|
||||||
let insertStartTime = getTime().toUnixFloat()
|
let insertStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
|
(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
|
||||||
waku_legacy_archive_errors.inc(labelValues = [insertFailure])
|
waku_legacy_archive_errors.inc(labelValues = [insertFailure])
|
||||||
error "failed to insert message", error = error
|
error "failed to insert message",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
timestamp = msg.timestamp,
|
||||||
|
error = error
|
||||||
return
|
return
|
||||||
|
|
||||||
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||||
|
waku_legacy_archive_insert_duration_seconds.observe(insertDuration)
|
||||||
|
|
||||||
debug "message archived",
|
debug "message archived",
|
||||||
msg_hash = msgHashHex,
|
msg_hash = msgHashHex,
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
msgTimestamp = msg.timestamp,
|
msgTimestamp = msg.timestamp,
|
||||||
usedTimestamp = msgTimestamp,
|
digest = msgDigestHex,
|
||||||
digest = msgDigestHex
|
insertDuration = insertDuration
|
||||||
|
|
||||||
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
|
||||||
waku_legacy_archive_insert_duration_seconds.observe(insertDuration)
|
|
||||||
|
|
||||||
proc findMessages*(
|
proc findMessages*(
|
||||||
self: WakuArchive, query: ArchiveQuery
|
self: WakuArchive, query: ArchiveQuery
|
||||||
|
Loading…
x
Reference in New Issue
Block a user