mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-10 14:26:26 +00:00
Add peers monitoring.
Add broadcasters monitoring. Resolve broadcasters to ids/addresses. Add ability to switch between FloodSub and GossipSub.
This commit is contained in:
parent
0794337ba6
commit
97878566b8
@ -4,17 +4,17 @@
|
||||
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
import strutils, times, os
|
||||
import strutils, os, tables
|
||||
import confutils, chronicles, chronos, libp2p/daemon/daemonapi,
|
||||
libp2p/multiaddress
|
||||
import stew/byteutils as bu
|
||||
import spec/[datatypes, network]
|
||||
import spec/network
|
||||
|
||||
const
|
||||
InspectorName* = "Beacon-Chain Network Inspector"
|
||||
InspectorMajor*: int = 0
|
||||
InspectorMinor*: int = 0
|
||||
InspectorPatch*: int = 1
|
||||
InspectorPatch*: int = 2
|
||||
InspectorVersion* = $InspectorMajor & "." & $InspectorMinor & "." &
|
||||
$InspectorPatch
|
||||
InspectorIdent* = "Inspector/$1 ($2/$3)" % [InspectorVersion,
|
||||
@ -37,8 +37,17 @@ type
|
||||
longform: "verbosity", shortform: "v",
|
||||
defaultValue: LogLevel.TRACE.}: LogLevel
|
||||
fullPeerId* {.desc: "Sets the inspector full PeerID output",
|
||||
longform: "fullpeerid", shortform: "pid",
|
||||
longform: "fullpeerid", shortform: "p",
|
||||
defaultValue: false.}: bool
|
||||
floodSub* {.desc: "Sets inspector engine to FloodSub",
|
||||
longform: "floodsub", shortform: "f",
|
||||
defaultValue: true.}: bool
|
||||
gossipSub* {.desc: "Sets inspector engine to GossipSub",
|
||||
longform: "gossipsub", shortform: "g",
|
||||
defaultValue: false.}: bool
|
||||
signFlag* {.desc: "Sets the inspector's to send/verify signatures in " &
|
||||
"pubsub messages",
|
||||
longform: "sign", shortform: "s", defaultValue: false.}: bool
|
||||
topics* {.desc: "Sets monitored topics, where `*` - all, " &
|
||||
"[a]ttestations, [b]locks, [e]xits, " &
|
||||
"[ps]roposer slashings, [as]ttester slashings",
|
||||
@ -48,7 +57,7 @@ type
|
||||
bootstrapFile* {.
|
||||
desc: "Specifies file which holds bootstrap nodes multiaddresses " &
|
||||
"delimeted by CRLF",
|
||||
longform: "bootfile", shortform: "bf", defaultValue: "".}: string
|
||||
longform: "bootfile", shortform: "l", defaultValue: "".}: string
|
||||
bootstrapNodes* {.
|
||||
desc: "Specifies one or more bootstrap nodes" &
|
||||
" to use when connecting to the network",
|
||||
@ -85,22 +94,64 @@ proc run(conf: InspectorConf) {.async.} =
|
||||
bootnodes: seq[string]
|
||||
api: DaemonApi
|
||||
identity: PeerInfo
|
||||
pubsubPeers: Table[PeerID, PeerInfo]
|
||||
peerQueue: AsyncQueue[PeerID]
|
||||
subs: seq[tuple[ticket: PubsubTicket, future: Future[void]]]
|
||||
topics: set[TopicFilter] = {}
|
||||
|
||||
pubsubPeers = initTable[PeerID, PeerInfo]()
|
||||
peerQueue = newAsyncQueue[PeerID]()
|
||||
|
||||
proc dumpPeers(api: DaemonAPI) {.async.} =
|
||||
while true:
|
||||
var peers = await api.listPeers()
|
||||
info "Connected peers information", peers_connected = len(peers)
|
||||
for item in peers:
|
||||
info "Connected peer", peer = getPeerId(item.peer, conf),
|
||||
addresses = item.addresses
|
||||
for key, value in pubsubPeers.pairs():
|
||||
info "Pubsub peer", peer = getPeerId(value.peer, conf),
|
||||
addresses = value.addresses
|
||||
await sleepAsync(10.seconds)
|
||||
|
||||
proc resolvePeers(api: DaemonAPI) {.async.} =
|
||||
var counter = 0
|
||||
while true:
|
||||
var peer = await peerQueue.popFirst()
|
||||
var info = await api.dhtFindPeer(peer)
|
||||
inc(counter)
|
||||
info "Peer resolved", peer = getPeerId(peer, conf),
|
||||
addresses = info.addresses, count = counter
|
||||
pubsubPeers[peer] = info
|
||||
|
||||
proc pubsubLogger(api: DaemonAPI,
|
||||
ticket: PubsubTicket,
|
||||
message: PubSubMessage): Future[bool] {.async.} =
|
||||
# We must return ``false`` only if we are not going to continue monitoring
|
||||
# of specific topic.
|
||||
var sig = if len(message.signature.data) > 0:
|
||||
$message.signature
|
||||
else:
|
||||
"<no signature>"
|
||||
var key = if len(message.signature.data) > 0:
|
||||
$message.key
|
||||
else:
|
||||
"<no public key>"
|
||||
|
||||
var pinfo = pubsubPeers.getOrDefault(message.peer)
|
||||
if len(pinfo.peer) == 0:
|
||||
pubsubPeers[message.peer] = PeerInfo(peer: message.peer)
|
||||
peerQueue.addLastNoWait(message.peer)
|
||||
|
||||
info "Received message", peerID = getPeerId(message.peer, conf),
|
||||
size = len(message.data),
|
||||
topic = ticket.topic,
|
||||
seqno = bu.toHex(message.seqno),
|
||||
signature = $message.signature,
|
||||
pubkey = $message.key,
|
||||
signature = sig,
|
||||
pubkey = key,
|
||||
mtopics = $message.topics,
|
||||
message = bu.toHex(message.data)
|
||||
message = bu.toHex(message.data),
|
||||
zpeers = len(pubsubPeers)
|
||||
result = true
|
||||
|
||||
if len(conf.topics) > 0:
|
||||
@ -159,13 +210,22 @@ proc run(conf: InspectorConf) {.async.} =
|
||||
info InspectorIdent & " starting", bootnodes = bootnodes,
|
||||
topic_filters = topics
|
||||
|
||||
var flags = {DHTClient, PSGossipSub, WaitBootstrap}
|
||||
var flags = {DHTClient, PSNoSign, WaitBootstrap}
|
||||
if conf.signFlag:
|
||||
flags.excl(PSNoSign)
|
||||
|
||||
if conf.gossipSub:
|
||||
flags.incl(PSGossipSub)
|
||||
else:
|
||||
flags.incl(PSFloodSub)
|
||||
|
||||
try:
|
||||
api = await newDaemonApi(flags, bootstrapNodes = bootnodes,
|
||||
peersRequired = 1)
|
||||
var identity = await api.identity()
|
||||
identity = await api.identity()
|
||||
info InspectorIdent & " started", peerID = getPeerId(identity.peer, conf),
|
||||
bound = identity.addresses
|
||||
bound = identity.addresses,
|
||||
options = flags
|
||||
except:
|
||||
error "Could not initialize p2pd daemon",
|
||||
exception = getCurrentExceptionMsg()
|
||||
@ -185,6 +245,11 @@ proc run(conf: InspectorConf) {.async.} =
|
||||
error "Could not subscribe to topics", exception = getCurrentExceptionMsg()
|
||||
quit(1)
|
||||
|
||||
# Starting DHT resolver task
|
||||
asyncCheck resolvePeers(api)
|
||||
# Starting peer dumper task
|
||||
asyncCheck dumpPeers(api)
|
||||
|
||||
var futures = newSeq[Future[void]]()
|
||||
var delindex = 0
|
||||
while true:
|
||||
|
Loading…
x
Reference in New Issue
Block a user