mirror of https://github.com/waku-org/nwaku.git
This commit is contained in:
parent
6eec201e8c
commit
a7d2dfd0aa
|
@ -225,6 +225,21 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||||
if node.wakuRelay.isSubscribed(topic):
|
if node.wakuRelay.isSubscribed(topic):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||||
|
let msg_hash = topic.computeMessageHash(msg).to0xHex()
|
||||||
|
|
||||||
|
notice "waku.relay received",
|
||||||
|
my_peer_id = node.peerId,
|
||||||
|
pubsubTopic = topic,
|
||||||
|
msg_hash = msg_hash,
|
||||||
|
receivedTime = getNowInNanosecondTime(),
|
||||||
|
payloadSizeBytes = msg.payload.len
|
||||||
|
|
||||||
|
let msgSizeKB = msg.payload.len / 1000
|
||||||
|
|
||||||
|
waku_node_messages.inc(labelValues = ["relay"])
|
||||||
|
waku_histogram_message_size.observe(msgSizeKB)
|
||||||
|
|
||||||
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||||
if node.wakuFilter.isNil():
|
if node.wakuFilter.isNil():
|
||||||
return
|
return
|
||||||
|
@ -240,6 +255,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||||
let defaultHandler = proc(
|
let defaultHandler = proc(
|
||||||
topic: PubsubTopic, msg: WakuMessage
|
topic: PubsubTopic, msg: WakuMessage
|
||||||
): Future[void] {.async, gcsafe.} =
|
): Future[void] {.async, gcsafe.} =
|
||||||
|
await traceHandler(topic, msg)
|
||||||
await filterHandler(topic, msg)
|
await filterHandler(topic, msg)
|
||||||
await archiveHandler(topic, msg)
|
await archiveHandler(topic, msg)
|
||||||
|
|
||||||
|
@ -376,61 +392,6 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||||
|
|
||||||
info "relay started successfully"
|
info "relay started successfully"
|
||||||
|
|
||||||
proc generateRelayObserver(node: WakuNode): PubSubObserver =
|
|
||||||
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
|
|
||||||
for msg in msgs.messages:
|
|
||||||
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
|
|
||||||
warn "Error generating message id",
|
|
||||||
my_peer_id = node.peerId,
|
|
||||||
from_peer_id = peer.peerId,
|
|
||||||
topic = msg.topic,
|
|
||||||
error = $error
|
|
||||||
continue
|
|
||||||
|
|
||||||
let msg_id_short = shortLog(msg_id)
|
|
||||||
|
|
||||||
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
|
||||||
warn "Error decoding to Waku Message",
|
|
||||||
my_peer_id = node.peerId,
|
|
||||||
msg_id = msg_id_short,
|
|
||||||
from_peer_id = peer.peerId,
|
|
||||||
topic = msg.topic,
|
|
||||||
error = $error
|
|
||||||
continue
|
|
||||||
|
|
||||||
let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()
|
|
||||||
|
|
||||||
if onRecv:
|
|
||||||
notice "received relay message",
|
|
||||||
my_peer_id = node.peerId,
|
|
||||||
msg_hash = msg_hash,
|
|
||||||
msg_id = msg_id_short,
|
|
||||||
from_peer_id = peer.peerId,
|
|
||||||
topic = msg.topic,
|
|
||||||
receivedTime = getNowInNanosecondTime(),
|
|
||||||
payloadSizeBytes = wakuMessage.payload.len
|
|
||||||
|
|
||||||
let msgSizeKB = wakuMessage.payload.len / 1000
|
|
||||||
waku_node_messages.inc(labelValues = ["relay"])
|
|
||||||
waku_histogram_message_size.observe(msgSizeKB)
|
|
||||||
else:
|
|
||||||
notice "sent relay message",
|
|
||||||
my_peer_id = node.peerId,
|
|
||||||
msg_hash = msg_hash,
|
|
||||||
msg_id = msg_id_short,
|
|
||||||
to_peer_id = peer.peerId,
|
|
||||||
topic = msg.topic,
|
|
||||||
sentTime = getNowInNanosecondTime(),
|
|
||||||
payloadSizeBytes = wakuMessage.payload.len
|
|
||||||
|
|
||||||
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
|
||||||
logMessageInfo(peer, msgs, onRecv = true)
|
|
||||||
|
|
||||||
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
|
||||||
discard
|
|
||||||
|
|
||||||
return PubSubObserver(onRecv: onRecv, onSend: onSend)
|
|
||||||
|
|
||||||
proc mountRelay*(
|
proc mountRelay*(
|
||||||
node: WakuNode,
|
node: WakuNode,
|
||||||
pubsubTopics: seq[string] = @[],
|
pubsubTopics: seq[string] = @[],
|
||||||
|
@ -451,11 +412,6 @@ proc mountRelay*(
|
||||||
|
|
||||||
node.wakuRelay = initRes.value
|
node.wakuRelay = initRes.value
|
||||||
|
|
||||||
# register relay observers for logging
|
|
||||||
debug "Registering Relay observers"
|
|
||||||
let observerLogger = node.generateRelayObserver()
|
|
||||||
node.wakuRelay.addObserver(observerLogger)
|
|
||||||
|
|
||||||
## Add peer exchange handler
|
## Add peer exchange handler
|
||||||
if peerExchangeHandler.isSome():
|
if peerExchangeHandler.isSome():
|
||||||
node.wakuRelay.parameters.enablePX = true
|
node.wakuRelay.parameters.enablePX = true
|
||||||
|
|
|
@ -180,9 +180,6 @@ proc addValidator*(
|
||||||
) {.gcsafe.} =
|
) {.gcsafe.} =
|
||||||
w.wakuValidators.add((handler, errorMessage))
|
w.wakuValidators.add((handler, errorMessage))
|
||||||
|
|
||||||
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
|
||||||
procCall GossipSub(w).addObserver(observer)
|
|
||||||
|
|
||||||
method start*(w: WakuRelay) {.async, base.} =
|
method start*(w: WakuRelay) {.async, base.} =
|
||||||
debug "start"
|
debug "start"
|
||||||
await procCall GossipSub(w).start()
|
await procCall GossipSub(w).start()
|
||||||
|
|
Loading…
Reference in New Issue