mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 08:53:06 +00:00
feat: enhance Waku Sync logs and metrics (#3370)
This commit is contained in:
parent
6ebb49a14b
commit
f6c680a46a
@ -10,6 +10,10 @@ declarePublicHistogram reconciliation_roundtrips,
|
|||||||
"the nubmer of roundtrips for each reconciliation",
|
"the nubmer of roundtrips for each reconciliation",
|
||||||
buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf]
|
buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf]
|
||||||
|
|
||||||
|
declarePublicHistogram reconciliation_differences,
|
||||||
|
"the nubmer of differences for each reconciliation",
|
||||||
|
buckets = [0.0, 10.0, 50.0, 100.0, 500.0, 1000.0, 10000.0, Inf]
|
||||||
|
|
||||||
declarePublicSummary total_bytes_exchanged,
|
declarePublicSummary total_bytes_exchanged,
|
||||||
"the number of bytes sent and received by the protocols", ["protocol", "direction"]
|
"the number of bytes sent and received by the protocols", ["protocol", "direction"]
|
||||||
|
|
||||||
|
|||||||
@ -96,19 +96,23 @@ proc messageIngress*(self: SyncReconciliation, id: SyncID) =
|
|||||||
proc processRequest(
|
proc processRequest(
|
||||||
self: SyncReconciliation, conn: Connection
|
self: SyncReconciliation, conn: Connection
|
||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
var roundTrips = 0
|
var
|
||||||
|
roundTrips = 0
|
||||||
|
diffs = 0
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
let readRes = catch:
|
let readRes = catch:
|
||||||
await conn.readLp(int.high)
|
await conn.readLp(int.high)
|
||||||
|
|
||||||
let buffer: seq[byte] = readRes.valueOr:
|
let buffer: seq[byte] = readRes.valueOr:
|
||||||
return err("connection read error: " & error.msg)
|
await conn.close()
|
||||||
|
return err("remote " & $conn.peerId & " connection read error: " & error.msg)
|
||||||
|
|
||||||
total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving])
|
total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving])
|
||||||
|
|
||||||
let recvPayload = RangesData.deltaDecode(buffer).valueOr:
|
let recvPayload = RangesData.deltaDecode(buffer).valueOr:
|
||||||
return err("payload decoding error: " & error)
|
await conn.close()
|
||||||
|
return err("remote " & $conn.peerId & " payload decoding error: " & error)
|
||||||
|
|
||||||
roundTrips.inc()
|
roundTrips.inc()
|
||||||
|
|
||||||
@ -136,9 +140,11 @@ proc processRequest(
|
|||||||
|
|
||||||
for hash in hashToSend:
|
for hash in hashToSend:
|
||||||
self.remoteNeedsTx.addLastNoWait((conn.peerId, hash))
|
self.remoteNeedsTx.addLastNoWait((conn.peerId, hash))
|
||||||
|
diffs.inc()
|
||||||
|
|
||||||
for hash in hashToRecv:
|
for hash in hashToRecv:
|
||||||
self.localWantsTx.addLastNoWait((conn.peerId, hash))
|
self.localWantsTx.addLastNoWait((conn.peerId, hash))
|
||||||
|
diffs.inc()
|
||||||
|
|
||||||
rawPayload = sendPayload.deltaEncode()
|
rawPayload = sendPayload.deltaEncode()
|
||||||
|
|
||||||
@ -150,7 +156,9 @@ proc processRequest(
|
|||||||
await conn.writeLP(rawPayload)
|
await conn.writeLP(rawPayload)
|
||||||
|
|
||||||
if writeRes.isErr():
|
if writeRes.isErr():
|
||||||
return err("connection write error: " & writeRes.error.msg)
|
await conn.close()
|
||||||
|
return
|
||||||
|
err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg)
|
||||||
|
|
||||||
trace "sync payload sent",
|
trace "sync payload sent",
|
||||||
local = self.peerManager.switch.peerInfo.peerId,
|
local = self.peerManager.switch.peerInfo.peerId,
|
||||||
@ -163,6 +171,7 @@ proc processRequest(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
reconciliation_roundtrips.observe(roundTrips)
|
reconciliation_roundtrips.observe(roundTrips)
|
||||||
|
reconciliation_differences.observe(diffs)
|
||||||
|
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
@ -196,12 +205,15 @@ proc initiate(
|
|||||||
await connection.writeLP(sendPayload)
|
await connection.writeLP(sendPayload)
|
||||||
|
|
||||||
if writeRes.isErr():
|
if writeRes.isErr():
|
||||||
return err("connection write error: " & writeRes.error.msg)
|
await connection.close()
|
||||||
|
return err(
|
||||||
|
"remote " & $connection.peerId & " connection write error: " & writeRes.error.msg
|
||||||
|
)
|
||||||
|
|
||||||
trace "sync payload sent",
|
trace "sync payload sent",
|
||||||
local = self.peerManager.switch.peerInfo.peerId,
|
local = self.peerManager.switch.peerInfo.peerId,
|
||||||
remote = connection.peerId,
|
remote = connection.peerId,
|
||||||
payload = sendPayload
|
payload = initPayload
|
||||||
|
|
||||||
?await self.processRequest(connection)
|
?await self.processRequest(connection)
|
||||||
|
|
||||||
@ -217,7 +229,7 @@ proc storeSynchronization*(
|
|||||||
let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec)
|
let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec)
|
||||||
|
|
||||||
let conn: Connection = connOpt.valueOr:
|
let conn: Connection = connOpt.valueOr:
|
||||||
return err("cannot establish sync connection")
|
return err("fail to dial remote " & $peer.peerId)
|
||||||
|
|
||||||
debug "sync session initialized",
|
debug "sync session initialized",
|
||||||
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
|
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
|
||||||
|
|||||||
@ -57,7 +57,8 @@ proc sendMessage(
|
|||||||
await conn.writeLP(rawPayload)
|
await conn.writeLP(rawPayload)
|
||||||
|
|
||||||
if writeRes.isErr():
|
if writeRes.isErr():
|
||||||
return err("connection write error: " & writeRes.error.msg)
|
return
|
||||||
|
err("remote " & $conn.peerId & " connection write error: " & writeRes.error.msg)
|
||||||
|
|
||||||
total_transfer_messages_exchanged.inc(labelValues = [Sending])
|
total_transfer_messages_exchanged.inc(labelValues = [Sending])
|
||||||
|
|
||||||
@ -69,7 +70,7 @@ proc openConnection(
|
|||||||
let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec)
|
let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec)
|
||||||
|
|
||||||
let conn: Connection = connOpt.valueOr:
|
let conn: Connection = connOpt.valueOr:
|
||||||
return err("Cannot establish transfer connection")
|
return err("fail to dial remote " & $peerId)
|
||||||
|
|
||||||
debug "transfer session initialized",
|
debug "transfer session initialized",
|
||||||
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
|
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
|
||||||
@ -126,6 +127,8 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} =
|
|||||||
WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0])
|
WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0])
|
||||||
|
|
||||||
(await sendMessage(connection, msg)).isOkOr:
|
(await sendMessage(connection, msg)).isOkOr:
|
||||||
|
self.outSessions.del(peerId)
|
||||||
|
await connection.close()
|
||||||
error "failed to send message", error = error
|
error "failed to send message", error = error
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -158,17 +161,16 @@ proc initProtocolHandler(self: SyncTransfer) =
|
|||||||
if value[].missingOrExcl(hash):
|
if value[].missingOrExcl(hash):
|
||||||
error "unwanted hash received, disconnecting"
|
error "unwanted hash received, disconnecting"
|
||||||
self.inSessions.del(conn.peerId)
|
self.inSessions.del(conn.peerId)
|
||||||
await conn.close()
|
|
||||||
break
|
break
|
||||||
do:
|
do:
|
||||||
error "unwanted hash received, disconnecting"
|
error "unwanted hash received, disconnecting"
|
||||||
self.inSessions.del(conn.peerId)
|
self.inSessions.del(conn.peerId)
|
||||||
await conn.close()
|
|
||||||
break
|
break
|
||||||
|
|
||||||
#TODO verify msg RLN proof...
|
#TODO verify msg RLN proof...
|
||||||
|
|
||||||
(await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr:
|
(await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr:
|
||||||
|
error "failed to archive message", error = $error
|
||||||
continue
|
continue
|
||||||
|
|
||||||
let id = SyncID(time: msg.timestamp, hash: hash)
|
let id = SyncID(time: msg.timestamp, hash: hash)
|
||||||
@ -176,6 +178,8 @@ proc initProtocolHandler(self: SyncTransfer) =
|
|||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
debug "transfer session ended",
|
debug "transfer session ended",
|
||||||
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
|
local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user