Add gossipsub messages monitoring tool. (#458)
* Add gossipsub messages monitoring tool. * Add support of bootstrap nodes file. * Move topic constants to spec/network.nim. * Add ability to monitor custom topics.
This commit is contained in:
parent
48d22d53dc
commit
13bde5aee3
|
@ -5,19 +5,13 @@ import
|
|||
json_serialization/std/[options, sets], serialization/errors,
|
||||
eth/trie/db, eth/trie/backends/rocksdb_backend, eth/async_utils,
|
||||
spec/[datatypes, digest, crypto, beaconstate, helpers, validator,
|
||||
state_transition_block],
|
||||
state_transition_block, network],
|
||||
conf, time, state_transition, fork_choice, ssz, beacon_chain_db,
|
||||
validator_pool, extras, attestation_pool, block_pool, eth2_network,
|
||||
beacon_node_types, mainchain_monitor, trusted_state_snapshots, version,
|
||||
sync_protocol, request_manager, validator_keygen, interop
|
||||
|
||||
const
|
||||
topicBeaconBlocks = "/eth2/beacon_block/ssz"
|
||||
topicAttestations = "/eth2/beacon_attestation/ssz"
|
||||
topicVoluntaryExits = "/eth2/voluntary_exit/ssz"
|
||||
topicProposerSlashings = "/eth2/proposer_slashing/ssz"
|
||||
topicAttesterSlashings = "/eth2/attester_slashing/ssz"
|
||||
|
||||
dataDirValidators = "validators"
|
||||
networkMetadataFile = "network.json"
|
||||
genesisFile = "genesis.json"
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
# beacon_chain
|
||||
# Copyright (c) 2018 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
import strutils, times, os
|
||||
import confutils, chronicles, chronos, libp2p/daemon/daemonapi,
|
||||
libp2p/multiaddress
|
||||
import stew/byteutils as bu
|
||||
import spec/[datatypes, network]
|
||||
|
||||
const
|
||||
InspectorName* = "Beacon-Chain Network Inspector"
|
||||
InspectorMajor*: int = 0
|
||||
InspectorMinor*: int = 0
|
||||
InspectorPatch*: int = 1
|
||||
InspectorVersion* = $InspectorMajor & "." & $InspectorMinor & "." &
|
||||
$InspectorPatch
|
||||
InspectorIdent* = "Inspector/$1 ($2/$3)" % [InspectorVersion,
|
||||
hostCPU, hostOS]
|
||||
InspectorCopyright* = "Copyright(C) 2019" &
|
||||
" Status Research & Development GmbH"
|
||||
InspectorHeader* = InspectorName & ", Version " & InspectorVersion &
|
||||
" [" & hostOS & ": " & hostCPU & "]\r\n" &
|
||||
InspectorCopyright & "\r\n"
|
||||
|
||||
type
|
||||
TopicFilter* {.pure.} = enum
|
||||
Blocks, Attestations, Exits, ProposerSlashing, AttesterSlashings
|
||||
|
||||
StartUpCommand* {.pure.} = enum
|
||||
noCommand
|
||||
|
||||
InspectorConf* = object
|
||||
logLevel* {.desc: "Sets the inspector's verbosity log level",
|
||||
longform: "verbosity", shortform: "v",
|
||||
defaultValue: LogLevel.TRACE.}: LogLevel
|
||||
fullPeerId* {.desc: "Sets the inspector full PeerID output",
|
||||
longform: "fullpeerid", shortform: "pid",
|
||||
defaultValue: false.}: bool
|
||||
topics* {.desc: "Sets monitored topics, where `*` - all, " &
|
||||
"[a]ttestations, [b]locks, [e]xits, " &
|
||||
"[ps]roposer slashings, [as]ttester slashings",
|
||||
longform: "topics", shortform: "t".}: seq[string]
|
||||
customTopics* {.desc: "Sets custom monitored topics",
|
||||
longform: "custom", shortform: "c".}: seq[string]
|
||||
bootstrapFile* {.
|
||||
desc: "Specifies file which holds bootstrap nodes multiaddresses " &
|
||||
"delimeted by CRLF",
|
||||
longform: "bootfile", shortform: "bf", defaultValue: "".}: string
|
||||
bootstrapNodes* {.
|
||||
desc: "Specifies one or more bootstrap nodes" &
|
||||
" to use when connecting to the network",
|
||||
longform: "bootnodes", shortform: "b".}: seq[string]
|
||||
|
||||
|
||||
proc getTopic(filter: TopicFilter): string {.inline.} =
|
||||
case filter
|
||||
of TopicFilter.Blocks:
|
||||
topicBeaconBlocks
|
||||
of TopicFilter.Attestations:
|
||||
topicAttestations
|
||||
of TopicFilter.Exits:
|
||||
topicVoluntaryExits
|
||||
of TopicFilter.ProposerSlashing:
|
||||
topicProposerSlashings
|
||||
of TopicFilter.AttesterSlashings:
|
||||
topicAttesterSlashings
|
||||
|
||||
proc getPeerId(peer: PeerID, conf: InspectorConf): string {.inline.} =
|
||||
if conf.fullPeerId:
|
||||
result = peer.pretty()
|
||||
else:
|
||||
result = $peer
|
||||
|
||||
proc loadBootFile(name: string): seq[string] =
|
||||
try:
|
||||
result = readFile(name).splitLines()
|
||||
except:
|
||||
discard
|
||||
|
||||
proc run(conf: InspectorConf) {.async.} =
|
||||
var
|
||||
bootnodes: seq[string]
|
||||
api: DaemonApi
|
||||
identity: PeerInfo
|
||||
subs: seq[tuple[ticket: PubsubTicket, future: Future[void]]]
|
||||
topics: set[TopicFilter] = {}
|
||||
|
||||
proc pubsubLogger(api: DaemonAPI,
|
||||
ticket: PubsubTicket,
|
||||
message: PubSubMessage): Future[bool] {.async.} =
|
||||
# We must return ``false`` only if we are not going to continue monitoring
|
||||
# of specific topic.
|
||||
info "Received message", peerID = getPeerId(message.peer, conf),
|
||||
size = len(message.data),
|
||||
topic = ticket.topic,
|
||||
seqno = bu.toHex(message.seqno),
|
||||
signature = $message.signature,
|
||||
pubkey = $message.key,
|
||||
mtopics = $message.topics,
|
||||
message = bu.toHex(message.data)
|
||||
result = true
|
||||
|
||||
if len(conf.topics) > 0:
|
||||
for item in conf.topics:
|
||||
let lcitem = item.toLowerAscii()
|
||||
|
||||
if lcitem == "*":
|
||||
topics.incl({TopicFilter.Blocks, TopicFilter.Attestations,
|
||||
TopicFilter.Exits, TopicFilter.ProposerSlashing,
|
||||
TopicFilter.AttesterSlashings})
|
||||
break
|
||||
elif lcitem == "a":
|
||||
topics.incl(TopicFilter.Attestations)
|
||||
elif lcitem == "b":
|
||||
topics.incl(TopicFilter.Blocks)
|
||||
elif lcitem == "e":
|
||||
topics.incl(TopicFilter.Exits)
|
||||
elif lcitem == "ps":
|
||||
topics.incl(TopicFilter.ProposerSlashing)
|
||||
elif lcitem == "as":
|
||||
topics.incl(TopicFilter.AttesterSlashings)
|
||||
else:
|
||||
discard
|
||||
else:
|
||||
topics.incl({TopicFilter.Blocks, TopicFilter.Attestations,
|
||||
TopicFilter.Exits, TopicFilter.ProposerSlashing,
|
||||
TopicFilter.AttesterSlashings})
|
||||
|
||||
if len(conf.bootstrapFile) > 0:
|
||||
info "Loading bootstrap nodes from file", filename = conf.bootstrapFile
|
||||
var nodes = loadBootFile(conf.bootstrapFile)
|
||||
for nodeString in nodes:
|
||||
try:
|
||||
var ma = MultiAddress.init(nodeString)
|
||||
if not(IPFS.match(ma)):
|
||||
warn "Incorrect bootnode address", address = nodeString
|
||||
else:
|
||||
bootnodes.add($ma)
|
||||
except:
|
||||
warn "Bootnode address is not valid MultiAddress", address = nodeString
|
||||
|
||||
for nodeString in conf.bootstrapNodes:
|
||||
try:
|
||||
var ma = MultiAddress.init(nodeString)
|
||||
if not(IPFS.match(ma)):
|
||||
warn "Incorrect bootnode address", address = nodeString
|
||||
else:
|
||||
bootnodes.add($ma)
|
||||
except:
|
||||
warn "Bootnode address is not valid MultiAddress", address = nodeString
|
||||
|
||||
if len(bootnodes) == 0:
|
||||
error "Not enough bootnodes to establish connection with network"
|
||||
quit(1)
|
||||
|
||||
info InspectorIdent & " starting", bootnodes = bootnodes,
|
||||
topic_filters = topics
|
||||
|
||||
var flags = {DHTClient, PSGossipSub, WaitBootstrap}
|
||||
try:
|
||||
api = await newDaemonApi(flags, bootstrapNodes = bootnodes,
|
||||
peersRequired = 1)
|
||||
var identity = await api.identity()
|
||||
info InspectorIdent & " started", peerID = getPeerId(identity.peer, conf),
|
||||
bound = identity.addresses
|
||||
except:
|
||||
error "Could not initialize p2pd daemon",
|
||||
exception = getCurrentExceptionMsg()
|
||||
quit(1)
|
||||
|
||||
try:
|
||||
for filter in topics:
|
||||
let topic = getTopic(filter)
|
||||
let t = await api.pubsubSubscribe(topic, pubsubLogger)
|
||||
info "Subscribed to topic", topic = topic
|
||||
subs.add((ticket: t, future: t.transp.join()))
|
||||
for filter in conf.customTopics:
|
||||
let t = await api.pubsubSubscribe(filter, pubsubLogger)
|
||||
info "Subscribed to custom topic", topic = filter
|
||||
subs.add((ticket: t, future: t.transp.join()))
|
||||
except:
|
||||
error "Could not subscribe to topics", exception = getCurrentExceptionMsg()
|
||||
quit(1)
|
||||
|
||||
var futures = newSeq[Future[void]]()
|
||||
var delindex = 0
|
||||
while true:
|
||||
if len(subs) == 0:
|
||||
break
|
||||
futures.setLen(0)
|
||||
for item in subs:
|
||||
futures.add(item.future)
|
||||
var fut = await one(futures)
|
||||
for i in 0 ..< len(subs):
|
||||
if subs[i].future == fut:
|
||||
delindex = i
|
||||
break
|
||||
error "Subscription lost", topic = subs[delindex].ticket.topic
|
||||
subs.delete(delindex)
|
||||
|
||||
when isMainModule:
|
||||
echo InspectorHeader
|
||||
var conf = InspectorConf.load(version = InspectorVersion)
|
||||
waitFor run(conf)
|
|
@ -0,0 +1,13 @@
|
|||
# beacon_chain
|
||||
# Copyright (c) 2018-2019 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
const
|
||||
topicBeaconBlocks* = "/eth2/beacon_block/ssz"
|
||||
topicAttestations* = "/eth2/beacon_attestation/ssz"
|
||||
topicVoluntaryExits* = "/eth2/voluntary_exit/ssz"
|
||||
topicProposerSlashings* = "/eth2/proposer_slashing/ssz"
|
||||
topicAttesterSlashings* = "/eth2/attester_slashing/ssz"
|
Loading…
Reference in New Issue