mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-28 15:00:57 +00:00
feature/accounting (#29)
* adds accounting experimentally * updated * sent * updated * print metrics * var * indent * fix
This commit is contained in:
parent
e5051b2251
commit
aafb865755
@ -116,6 +116,20 @@ proc run(config: WakuNodeConf) =
|
|||||||
setupWakuSimRPC(node, rpcServer)
|
setupWakuSimRPC(node, rpcServer)
|
||||||
rpcServer.start()
|
rpcServer.start()
|
||||||
|
|
||||||
|
proc logPeerAccounting(udata: pointer) {.closure, gcsafe.} =
|
||||||
|
{.gcsafe.}:
|
||||||
|
|
||||||
|
for peer in node.peerPool.peers:
|
||||||
|
let
|
||||||
|
sent = peer.state(Waku).accounting.sent
|
||||||
|
received = peer.state(Waku).accounting.received
|
||||||
|
id = peer.remote.id
|
||||||
|
info "Peer Metrics", id, sent, received
|
||||||
|
peer.state(Waku).accounting = Accounting(sent: 0, received: 0)
|
||||||
|
|
||||||
|
addTimer(Moment.fromNow(2.seconds), logPeerAccounting)
|
||||||
|
addTimer(Moment.fromNow(2.seconds), logPeerAccounting)
|
||||||
|
|
||||||
when defined(insecure):
|
when defined(insecure):
|
||||||
if config.metricsServer:
|
if config.metricsServer:
|
||||||
let
|
let
|
||||||
|
@ -71,6 +71,10 @@ type
|
|||||||
rateLimits*: Option[RateLimits]
|
rateLimits*: Option[RateLimits]
|
||||||
topics*: Option[seq[Topic]]
|
topics*: Option[seq[Topic]]
|
||||||
|
|
||||||
|
Accounting* = ref object
|
||||||
|
sent*: uint
|
||||||
|
received*: uint
|
||||||
|
|
||||||
WakuPeer = ref object
|
WakuPeer = ref object
|
||||||
initialized: bool # when successfully completed the handshake
|
initialized: bool # when successfully completed the handshake
|
||||||
powRequirement*: float64
|
powRequirement*: float64
|
||||||
@ -79,6 +83,7 @@ type
|
|||||||
trusted*: bool
|
trusted*: bool
|
||||||
topics*: Option[seq[Topic]]
|
topics*: Option[seq[Topic]]
|
||||||
received: HashSet[Hash]
|
received: HashSet[Hash]
|
||||||
|
accounting*: Accounting
|
||||||
|
|
||||||
P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.}
|
P2PRequestHandler* = proc(peer: Peer, envelope: Envelope) {.gcsafe.}
|
||||||
|
|
||||||
@ -260,6 +265,7 @@ p2pProtocol Waku(version = wakuVersion,
|
|||||||
|
|
||||||
wakuPeer.received.init()
|
wakuPeer.received.init()
|
||||||
wakuPeer.trusted = false
|
wakuPeer.trusted = false
|
||||||
|
wakuPeer.accounting = Accounting(sent: 0, received: 0)
|
||||||
wakuPeer.initialized = true
|
wakuPeer.initialized = true
|
||||||
|
|
||||||
# No timer based queue processing for a light node.
|
# No timer based queue processing for a light node.
|
||||||
@ -284,6 +290,8 @@ p2pProtocol Waku(version = wakuVersion,
|
|||||||
# await peer.disconnect(SubprotocolReason)
|
# await peer.disconnect(SubprotocolReason)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
peer.state.accounting.received += 1
|
||||||
|
|
||||||
let msg = initMessage(envelope)
|
let msg = initMessage(envelope)
|
||||||
if not msg.allowed(peer.networkState.config):
|
if not msg.allowed(peer.networkState.config):
|
||||||
# disconnect from peers sending bad envelopes
|
# disconnect from peers sending bad envelopes
|
||||||
@ -403,6 +411,7 @@ proc processQueue(peer: Peer) =
|
|||||||
|
|
||||||
trace "Adding envelope"
|
trace "Adding envelope"
|
||||||
envelopes.add(message.env)
|
envelopes.add(message.env)
|
||||||
|
wakuPeer.accounting.sent += 1
|
||||||
wakuPeer.received.incl(message.hash)
|
wakuPeer.received.incl(message.hash)
|
||||||
|
|
||||||
if envelopes.len() > 0:
|
if envelopes.len() > 0:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user