273 lines
9.7 KiB
Nim
273 lines
9.7 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
import strutils, os, tables
|
|
import confutils, chronicles, chronos, libp2p/daemon/daemonapi,
|
|
libp2p/multiaddress
|
|
import stew/byteutils as bu
|
|
import spec/network
|
|
|
|
const
|
|
InspectorName* = "Beacon-Chain Network Inspector"
|
|
InspectorMajor*: int = 0
|
|
InspectorMinor*: int = 0
|
|
InspectorPatch*: int = 2
|
|
InspectorVersion* = $InspectorMajor & "." & $InspectorMinor & "." &
|
|
$InspectorPatch
|
|
InspectorIdent* = "Inspector/$1 ($2/$3)" % [InspectorVersion,
|
|
hostCPU, hostOS]
|
|
InspectorCopyright* = "Copyright(C) 2019" &
|
|
" Status Research & Development GmbH"
|
|
InspectorHeader* = InspectorName & ", Version " & InspectorVersion &
|
|
" [" & hostOS & ": " & hostCPU & "]\r\n" &
|
|
InspectorCopyright & "\r\n"
|
|
|
|
type
|
|
TopicFilter* {.pure.} = enum
|
|
Blocks, Attestations, Exits, ProposerSlashing, AttesterSlashings
|
|
|
|
StartUpCommand* {.pure.} = enum
|
|
noCommand
|
|
|
|
InspectorConf* = object
|
|
logLevel* {.desc: "Sets the inspector's verbosity log level",
|
|
longform: "verbosity", shortform: "v",
|
|
defaultValue: LogLevel.TRACE.}: LogLevel
|
|
fullPeerId* {.desc: "Sets the inspector full PeerID output",
|
|
longform: "fullpeerid", shortform: "p",
|
|
defaultValue: false.}: bool
|
|
floodSub* {.desc: "Sets inspector engine to FloodSub",
|
|
longform: "floodsub", shortform: "f",
|
|
defaultValue: true.}: bool
|
|
gossipSub* {.desc: "Sets inspector engine to GossipSub",
|
|
longform: "gossipsub", shortform: "g",
|
|
defaultValue: false.}: bool
|
|
signFlag* {.desc: "Sets the inspector's to send/verify signatures in " &
|
|
"pubsub messages",
|
|
longform: "sign", shortform: "s", defaultValue: false.}: bool
|
|
topics* {.desc: "Sets monitored topics, where `*` - all, " &
|
|
"[a]ttestations, [b]locks, [e]xits, " &
|
|
"[ps]roposer slashings, [as]ttester slashings",
|
|
longform: "topics", shortform: "t".}: seq[string]
|
|
customTopics* {.desc: "Sets custom monitored topics",
|
|
longform: "custom", shortform: "c".}: seq[string]
|
|
bootstrapFile* {.
|
|
desc: "Specifies file which holds bootstrap nodes multiaddresses " &
|
|
"delimeted by CRLF",
|
|
longform: "bootfile", shortform: "l", defaultValue: "".}: string
|
|
bootstrapNodes* {.
|
|
desc: "Specifies one or more bootstrap nodes" &
|
|
" to use when connecting to the network",
|
|
longform: "bootnodes", shortform: "b".}: seq[string]
|
|
|
|
|
|
proc getTopic(filter: TopicFilter): string {.inline.} =
|
|
case filter
|
|
of TopicFilter.Blocks:
|
|
topicBeaconBlocks
|
|
of TopicFilter.Attestations:
|
|
topicAttestations
|
|
of TopicFilter.Exits:
|
|
topicVoluntaryExits
|
|
of TopicFilter.ProposerSlashing:
|
|
topicProposerSlashings
|
|
of TopicFilter.AttesterSlashings:
|
|
topicAttesterSlashings
|
|
|
|
proc getPeerId(peer: PeerID, conf: InspectorConf): string {.inline.} =
|
|
if conf.fullPeerId:
|
|
result = peer.pretty()
|
|
else:
|
|
result = $peer
|
|
|
|
proc loadBootFile(name: string): seq[string] =
|
|
try:
|
|
result = readFile(name).splitLines()
|
|
except:
|
|
discard
|
|
|
|
proc run(conf: InspectorConf) {.async.} =
|
|
var
|
|
bootnodes: seq[string]
|
|
api: DaemonApi
|
|
identity: PeerInfo
|
|
pubsubPeers: Table[PeerID, PeerInfo]
|
|
peerQueue: AsyncQueue[PeerID]
|
|
subs: seq[tuple[ticket: PubsubTicket, future: Future[void]]]
|
|
topics: set[TopicFilter] = {}
|
|
|
|
pubsubPeers = initTable[PeerID, PeerInfo]()
|
|
peerQueue = newAsyncQueue[PeerID]()
|
|
|
|
proc dumpPeers(api: DaemonAPI) {.async.} =
|
|
while true:
|
|
var peers = await api.listPeers()
|
|
info "Connected peers information", peers_connected = len(peers)
|
|
for item in peers:
|
|
info "Connected peer", peer = getPeerId(item.peer, conf),
|
|
addresses = item.addresses
|
|
for key, value in pubsubPeers.pairs():
|
|
info "Pubsub peer", peer = getPeerId(value.peer, conf),
|
|
addresses = value.addresses
|
|
await sleepAsync(10.seconds)
|
|
|
|
proc resolvePeers(api: DaemonAPI) {.async.} =
|
|
var counter = 0
|
|
while true:
|
|
var peer = await peerQueue.popFirst()
|
|
var info = await api.dhtFindPeer(peer)
|
|
inc(counter)
|
|
info "Peer resolved", peer = getPeerId(peer, conf),
|
|
addresses = info.addresses, count = counter
|
|
pubsubPeers[peer] = info
|
|
|
|
proc pubsubLogger(api: DaemonAPI,
|
|
ticket: PubsubTicket,
|
|
message: PubSubMessage): Future[bool] {.async.} =
|
|
# We must return ``false`` only if we are not going to continue monitoring
|
|
# of specific topic.
|
|
var sig = if len(message.signature.data) > 0:
|
|
$message.signature
|
|
else:
|
|
"<no signature>"
|
|
var key = if len(message.signature.data) > 0:
|
|
$message.key
|
|
else:
|
|
"<no public key>"
|
|
|
|
var pinfo = pubsubPeers.getOrDefault(message.peer)
|
|
if len(pinfo.peer) == 0:
|
|
pubsubPeers[message.peer] = PeerInfo(peer: message.peer)
|
|
peerQueue.addLastNoWait(message.peer)
|
|
|
|
info "Received message", peerID = getPeerId(message.peer, conf),
|
|
size = len(message.data),
|
|
topic = ticket.topic,
|
|
seqno = bu.toHex(message.seqno),
|
|
signature = sig,
|
|
pubkey = key,
|
|
mtopics = $message.topics,
|
|
message = bu.toHex(message.data),
|
|
zpeers = len(pubsubPeers)
|
|
result = true
|
|
|
|
if len(conf.topics) > 0:
|
|
for item in conf.topics:
|
|
let lcitem = item.toLowerAscii()
|
|
|
|
if lcitem == "*":
|
|
topics.incl({TopicFilter.Blocks, TopicFilter.Attestations,
|
|
TopicFilter.Exits, TopicFilter.ProposerSlashing,
|
|
TopicFilter.AttesterSlashings})
|
|
break
|
|
elif lcitem == "a":
|
|
topics.incl(TopicFilter.Attestations)
|
|
elif lcitem == "b":
|
|
topics.incl(TopicFilter.Blocks)
|
|
elif lcitem == "e":
|
|
topics.incl(TopicFilter.Exits)
|
|
elif lcitem == "ps":
|
|
topics.incl(TopicFilter.ProposerSlashing)
|
|
elif lcitem == "as":
|
|
topics.incl(TopicFilter.AttesterSlashings)
|
|
else:
|
|
discard
|
|
else:
|
|
topics.incl({TopicFilter.Blocks, TopicFilter.Attestations,
|
|
TopicFilter.Exits, TopicFilter.ProposerSlashing,
|
|
TopicFilter.AttesterSlashings})
|
|
|
|
if len(conf.bootstrapFile) > 0:
|
|
info "Loading bootstrap nodes from file", filename = conf.bootstrapFile
|
|
var nodes = loadBootFile(conf.bootstrapFile)
|
|
for nodeString in nodes:
|
|
try:
|
|
var ma = MultiAddress.init(nodeString)
|
|
if not(IPFS.match(ma)):
|
|
warn "Incorrect bootnode address", address = nodeString
|
|
else:
|
|
bootnodes.add($ma)
|
|
except:
|
|
warn "Bootnode address is not valid MultiAddress", address = nodeString
|
|
|
|
for nodeString in conf.bootstrapNodes:
|
|
try:
|
|
var ma = MultiAddress.init(nodeString)
|
|
if not(IPFS.match(ma)):
|
|
warn "Incorrect bootnode address", address = nodeString
|
|
else:
|
|
bootnodes.add($ma)
|
|
except:
|
|
warn "Bootnode address is not valid MultiAddress", address = nodeString
|
|
|
|
if len(bootnodes) == 0:
|
|
error "Not enough bootnodes to establish connection with network"
|
|
quit(1)
|
|
|
|
info InspectorIdent & " starting", bootnodes = bootnodes,
|
|
topic_filters = topics
|
|
|
|
var flags = {DHTClient, PSNoSign, WaitBootstrap}
|
|
if conf.signFlag:
|
|
flags.excl(PSNoSign)
|
|
|
|
if conf.gossipSub:
|
|
flags.incl(PSGossipSub)
|
|
else:
|
|
flags.incl(PSFloodSub)
|
|
|
|
try:
|
|
api = await newDaemonApi(flags, bootstrapNodes = bootnodes,
|
|
peersRequired = 1)
|
|
identity = await api.identity()
|
|
info InspectorIdent & " started", peerID = getPeerId(identity.peer, conf),
|
|
bound = identity.addresses,
|
|
options = flags
|
|
except:
|
|
error "Could not initialize p2pd daemon",
|
|
exception = getCurrentExceptionMsg()
|
|
quit(1)
|
|
|
|
try:
|
|
for filter in topics:
|
|
let topic = getTopic(filter)
|
|
let t = await api.pubsubSubscribe(topic, pubsubLogger)
|
|
info "Subscribed to topic", topic = topic
|
|
subs.add((ticket: t, future: t.transp.join()))
|
|
for filter in conf.customTopics:
|
|
let t = await api.pubsubSubscribe(filter, pubsubLogger)
|
|
info "Subscribed to custom topic", topic = filter
|
|
subs.add((ticket: t, future: t.transp.join()))
|
|
except:
|
|
error "Could not subscribe to topics", exception = getCurrentExceptionMsg()
|
|
quit(1)
|
|
|
|
# Starting DHT resolver task
|
|
asyncCheck resolvePeers(api)
|
|
# Starting peer dumper task
|
|
asyncCheck dumpPeers(api)
|
|
|
|
var futures = newSeq[Future[void]]()
|
|
var delindex = 0
|
|
while true:
|
|
if len(subs) == 0:
|
|
break
|
|
futures.setLen(0)
|
|
for item in subs:
|
|
futures.add(item.future)
|
|
var fut = await one(futures)
|
|
for i in 0 ..< len(subs):
|
|
if subs[i].future == fut:
|
|
delindex = i
|
|
break
|
|
error "Subscription lost", topic = subs[delindex].ticket.topic
|
|
subs.delete(delindex)
|
|
|
|
when isMainModule:
|
|
echo InspectorHeader
|
|
var conf = InspectorConf.load(version = InspectorVersion)
|
|
waitFor run(conf)
|