mirror of https://github.com/waku-org/nwaku.git
chore: adding observer on message sent
This commit is contained in:
parent
94947a8504
commit
47c5454832
|
@ -392,6 +392,61 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||||
|
|
||||||
info "relay started successfully"
|
info "relay started successfully"
|
||||||
|
|
||||||
|
proc generateRelayObserver(node: WakuNode): PubSubObserver =
|
||||||
|
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
|
||||||
|
for msg in msgs.messages:
|
||||||
|
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
|
||||||
|
warn "Error generating message id",
|
||||||
|
my_peer_id = node.peerId,
|
||||||
|
from_peer_id = peer.peerId,
|
||||||
|
topic = msg.topic,
|
||||||
|
error = $error
|
||||||
|
continue
|
||||||
|
|
||||||
|
let msg_id_short = shortLog(msg_id)
|
||||||
|
|
||||||
|
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
||||||
|
warn "Error decoding to Waku Message",
|
||||||
|
my_peer_id = node.peerId,
|
||||||
|
msg_id = msg_id_short,
|
||||||
|
from_peer_id = peer.peerId,
|
||||||
|
topic = msg.topic,
|
||||||
|
error = $error
|
||||||
|
continue
|
||||||
|
|
||||||
|
let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()
|
||||||
|
|
||||||
|
if onRecv:
|
||||||
|
notice "received relay message",
|
||||||
|
my_peer_id = node.peerId,
|
||||||
|
msg_hash = msg_hash,
|
||||||
|
msg_id = msg_id_short,
|
||||||
|
from_peer_id = peer.peerId,
|
||||||
|
topic = msg.topic,
|
||||||
|
receivedTime = getNowInNanosecondTime(),
|
||||||
|
payloadSizeBytes = wakuMessage.payload.len
|
||||||
|
|
||||||
|
let msgSizeKB = wakuMessage.payload.len / 1000
|
||||||
|
waku_node_messages.inc(labelValues = ["relay"])
|
||||||
|
waku_histogram_message_size.observe(msgSizeKB)
|
||||||
|
else:
|
||||||
|
notice "sent relay message",
|
||||||
|
my_peer_id = node.peerId,
|
||||||
|
msg_hash = msg_hash,
|
||||||
|
msg_id = msg_id_short,
|
||||||
|
to_peer_id = peer.peerId,
|
||||||
|
topic = msg.topic,
|
||||||
|
sentTime = getNowInNanosecondTime(),
|
||||||
|
payloadSizeBytes = wakuMessage.payload.len
|
||||||
|
|
||||||
|
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||||
|
logMessageInfo(peer, msgs, onRecv = false)
|
||||||
|
|
||||||
|
return PubSubObserver(onRecv: onRecv, onSend: onSend)
|
||||||
|
|
||||||
proc mountRelay*(
|
proc mountRelay*(
|
||||||
node: WakuNode,
|
node: WakuNode,
|
||||||
pubsubTopics: seq[string] = @[],
|
pubsubTopics: seq[string] = @[],
|
||||||
|
@ -412,6 +467,11 @@ proc mountRelay*(
|
||||||
|
|
||||||
node.wakuRelay = initRes.value
|
node.wakuRelay = initRes.value
|
||||||
|
|
||||||
|
# register relay observers for logging
|
||||||
|
debug "Registering Relay observers"
|
||||||
|
let observerLogger = node.generateRelayObserver()
|
||||||
|
node.wakuRelay.addObserver(observerLogger)
|
||||||
|
|
||||||
## Add peer exchange handler
|
## Add peer exchange handler
|
||||||
if peerExchangeHandler.isSome():
|
if peerExchangeHandler.isSome():
|
||||||
node.wakuRelay.parameters.enablePX = true
|
node.wakuRelay.parameters.enablePX = true
|
||||||
|
|
|
@ -180,6 +180,9 @@ proc addValidator*(
|
||||||
) {.gcsafe.} =
|
) {.gcsafe.} =
|
||||||
w.wakuValidators.add((handler, errorMessage))
|
w.wakuValidators.add((handler, errorMessage))
|
||||||
|
|
||||||
|
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
||||||
|
procCall GossipSub(w).addObserver(observer)
|
||||||
|
|
||||||
method start*(w: WakuRelay) {.async, base.} =
|
method start*(w: WakuRelay) {.async, base.} =
|
||||||
debug "start"
|
debug "start"
|
||||||
await procCall GossipSub(w).start()
|
await procCall GossipSub(w).start()
|
||||||
|
|
Loading…
Reference in New Issue