Inspector native version. (#995)

This commit is contained in:
Eugene Kabanov 2020-05-09 17:18:58 +03:00 committed by GitHub
parent 093d298b2b
commit 55dfcc6783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 609 additions and 169 deletions

View File

@ -4,34 +4,68 @@
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
import sequtils, strutils, os, tables import strutils, os, tables, options
import confutils, chronicles, chronos, libp2p/daemon/daemonapi, import confutils, chronicles, chronos
libp2p/multiaddress import libp2p/[switch, standard_setup, connection, multiaddress, multicodec,
peer, crypto/secp, peerinfo, peer]
import libp2p/crypto/crypto as lcrypto
import eth/p2p/discoveryv5/enr as enr
import eth/p2p/discoveryv5/[protocol, discovery_db, types]
import eth/keys as ethkeys, eth/trie/db
import stew/[results, objects]
import stew/byteutils as bu import stew/byteutils as bu
import nimcrypto/[hash, keccak]
import secp256k1 as s
import stint
import snappy
import spec/[crypto, datatypes, network, digest], ssz import spec/[crypto, datatypes, network, digest], ssz
const const
InspectorName* = "Beacon-Chain Network Inspector" InspectorName* = "Beacon-Chain Network Inspector"
InspectorMajor*: int = 0 InspectorMajor*: int = 0
InspectorMinor*: int = 0 InspectorMinor*: int = 0
InspectorPatch*: int = 2 InspectorPatch*: int = 3
InspectorVersion* = $InspectorMajor & "." & $InspectorMinor & "." & InspectorVersion* = $InspectorMajor & "." & $InspectorMinor & "." &
$InspectorPatch $InspectorPatch
InspectorIdent* = "Inspector/$1 ($2/$3)" % [InspectorVersion, InspectorIdent* = "Inspector/$1 ($2/$3)" % [InspectorVersion,
hostCPU, hostOS] hostCPU, hostOS]
InspectorCopyright* = "Copyright(C) 2019" & InspectorCopyright* = "Copyright(C) 2020" &
" Status Research & Development GmbH" " Status Research & Development GmbH"
InspectorHeader* = InspectorName & ", Version " & InspectorVersion & InspectorHeader* = InspectorName & ", Version " & InspectorVersion &
" [" & hostOS & ": " & hostCPU & "]\r\n" & " [" & hostOS & ": " & hostCPU & "]\r\n" &
InspectorCopyright & "\r\n" InspectorCopyright & "\r\n"
DISCV5BN* = mapAnd(UDP, mapEq("p2p"))
ETH2BN* = mapAnd(TCP, mapEq("p2p"))
type type
DiscoveryProtocol* = protocol.Protocol
ENRFieldPair* = object
eth2: seq[byte]
attnets: seq[byte]
ENRForkID* = object
fork_digest*: ForkDigest
next_fork_version*: Version
next_fork_epoch*: Epoch
TopicFilter* {.pure.} = enum TopicFilter* {.pure.} = enum
Blocks, Attestations, Exits, ProposerSlashing, AttesterSlashings Blocks, Attestations, Exits, ProposerSlashing, AttesterSlashings
BootstrapKind* {.pure.} = enum
Enr, MultiAddr
StartUpCommand* {.pure.} = enum StartUpCommand* {.pure.} = enum
noCommand noCommand
BootstrapAddress* = object
case kind*: BootstrapKind
of BootstrapKind.Enr:
addressRec: enr.Record
of BootstrapKind.MultiAddr:
addressMa: MultiAddress
InspectorConf* = object InspectorConf* = object
logLevel* {. logLevel* {.
defaultValue: LogLevel.TRACE defaultValue: LogLevel.TRACE
@ -58,6 +92,7 @@ type
name: "gossipsub" }: bool name: "gossipsub" }: bool
forkDigest* {. forkDigest* {.
defaultValue: "",
desc: "Sets the fork-digest value used to construct all topic names" desc: "Sets the fork-digest value used to construct all topic names"
name: "forkdigest"}: string name: "forkdigest"}: string
@ -97,24 +132,82 @@ type
abbr: "d" abbr: "d"
defaultValue: false }: bool defaultValue: false }: bool
func getTopics(forkDigest: ForkDigest, filter: TopicFilter): seq[string] {.inline.} = discoveryPort* {.
desc: "DiscoveryV5 UDP port number"
defaultValue: 9000 }: int
ethPort* {.
desc: "Ethereum2 TCP port number",
defaultValue: 9000 }: int
bindAddress* {.
desc: "Bind Discovery to MultiAddress",
defaultValue: "/ip4/0.0.0.0".}: string
maxPeers* {.
desc: "Maximum number of peers",
defaultValue: 100.}: int
noDiscovery* {.
desc: "Disable discovery",
defaultValue: false.}: bool
proc `==`*(a, b: ENRFieldPair): bool {.inline.} =
result = (a.eth2 == b.eth2)
proc shortLog*(a: PeerInfo): string =
for ma in a.addrs:
if TCP.match(ma):
return $ma & "/" & $a.peerId
for ma in a.addrs:
if UDP.match(ma):
return $ma & "/" & $a.peerId
result = $a
proc hasTCP(a: PeerInfo): bool =
for ma in a.addrs:
if TCP.match(ma):
return true
proc toNodeId(a: PeerID): Option[NodeId] =
var buffer: array[64, byte]
if a.hasPublicKey():
var pubkey: lcrypto.PublicKey
if extractPublicKey(a, pubkey):
if pubkey.scheme == PKScheme.Secp256k1:
let tmp = s.SkPublicKey(pubkey.skkey).toRaw()
copyMem(addr buffer[0], unsafeAddr tmp[1], 64)
result = some(readUintBE[256](keccak256.digest(buffer).data))
chronicles.formatIt PeerInfo: it.shortLog
chronicles.formatIt seq[PeerInfo]:
var res = newSeq[string]()
for item in it.items(): res.add(item.shortLog())
"[" & res.join(", ") & "]"
func getTopics(forkDigest: ForkDigest,
filter: TopicFilter): seq[string] {.inline.} =
case filter case filter
of TopicFilter.Blocks: of TopicFilter.Blocks:
@[getBeaconBlocksTopic(forkDigest)] let topic = getBeaconBlocksTopic(forkDigest)
of TopicFilter.Attestations: @[topic, topic & "_snappy"]
mapIt(0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64, getAttestationTopic(forkDigest, it))
of TopicFilter.Exits: of TopicFilter.Exits:
@[getVoluntaryExitsTopic(forkDigest)] let topic = getVoluntaryExitsTopic(forkDigest)
@[topic, topic & "_snappy"]
of TopicFilter.ProposerSlashing: of TopicFilter.ProposerSlashing:
@[getProposerSlashingsTopic(forkDigest)] let topic = getProposerSlashingsTopic(forkDigest)
@[topic, topic & "_snappy"]
of TopicFilter.AttesterSlashings: of TopicFilter.AttesterSlashings:
@[getAttesterSlashingsTopic(forkDigest)] let topic = getAttesterSlashingsTopic(forkDigest)
@[topic, topic & "_snappy"]
func getPeerId(peer: PeerID, conf: InspectorConf): string {.inline.} = of TopicFilter.Attestations:
if conf.fullPeerId: var topics = newSeq[string](ATTESTATION_SUBNET_COUNT * 2)
result = peer.pretty() var offset = 0
else: for i in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
result = $peer topics[offset] = getAttestationTopic(forkDigest, i)
topics[offset + 1] = getAttestationTopic(forkDigest, i) & "_snappy"
offset += 2
topics
proc loadBootFile(name: string): seq[string] = proc loadBootFile(name: string): seq[string] =
try: try:
@ -122,88 +215,479 @@ proc loadBootFile(name: string): seq[string] =
except: except:
discard discard
proc unpackYmlLine(line: string): string =
result = line
let stripped = line.strip()
var parts = stripped.split({'"'})
if len(parts) == 3:
if parts[0].startsWith("-") and len(parts[2]) == 0:
result = parts[1]
proc getBootstrapAddress(bootnode: string): Option[BootstrapAddress] =
var rec: enr.Record
try:
var stripped = bootnode.strip()
if stripped.startsWith("-"):
stripped = unpackYmlLine(stripped)
if len(stripped) > 0:
if stripped.startsWith("enr:"):
if fromURI(rec, EnrUri(stripped)):
let res = BootstrapAddress(kind: BootstrapKind.Enr, addressRec: rec)
return some(res)
else:
warn "Incorrect or empty ENR bootstrap address", address = stripped
else:
let ma = MultiAddress.init(stripped)
if ETH2BN.match(ma) or DISCV5BN.match(ma):
let res = BootstrapAddress(kind: BootstrapKind.MultiAddr,
addressMa: ma)
return some(res)
else:
warn "Incorrect MultiAddress bootstrap address", address = stripped
except CatchableError as exc:
warn "Incorrect bootstrap address", address = bootnode, errMsg = exc.msg
proc tryGetForkDigest(bootnode: enr.Record): Option[ForkDigest] =
var forkId: ENRForkID
var sszForkData = bootnode.tryGet("eth2", seq[byte])
if sszForkData.isSome():
try:
forkId = SSZ.decode(sszForkData.get(), ENRForkID)
result = some(forkId.fork_digest)
except CatchableError:
discard
proc tryGetFieldPairs(bootnode: enr.Record): Option[ENRFieldPair] =
var sszEth2 = bootnode.tryGet("eth2", seq[byte])
var sszAttnets = bootnode.tryGet("attnets", seq[byte])
if sszEth2.isSome() and sszAttnets.isSome():
result = some(ENRFieldPair(eth2: sszEth2.get(),
attnets: sszAttnets.get()))
proc tryGetForkDigest(hexdigest: string): Option[ForkDigest] =
var res: ForkDigest
if len(hexdigest) > 0:
try:
hexToByteArray(hexdigest, res)
result = some(res)
except CatchableError:
discard
proc tryGetMultiAddress(address: string): Option[MultiAddress] =
try:
let ma = MultiAddress.init(address)
if IP4.match(ma) or IP6.match(ma):
result = some(ma)
except MultiAddressError:
discard
proc loadBootstrapNodes(conf: InspectorConf): seq[BootstrapAddress] =
result = newSeq[BootstrapAddress]()
if len(conf.bootstrapFile) > 0:
info "Loading bootstrap nodes from file", filename = conf.bootstrapFile
var nodes = loadBootFile(conf.bootstrapFile)
for nodeString in nodes:
let res = getBootstrapAddress(nodeString)
if res.isSome():
result.add(res.get())
for nodeString in conf.bootstrapNodes:
let res = getBootstrapAddress(nodeString)
if res.isSome():
result.add(res.get())
proc init*(p: typedesc[PeerInfo],
maddr: MultiAddress): Option[PeerInfo] {.inline.} =
## Initialize PeerInfo using address which includes PeerID.
if IPFS.match(maddr):
let peerid = maddr[2].protoAddress()
result = some(PeerInfo.init(PeerID.init(peerid), [maddr[0] & maddr[1]]))
proc init*(p: typedesc[PeerInfo],
enraddr: enr.Record): Option[PeerInfo] =
var trec: enr.TypedRecord
try:
let trecOpt = enraddr.toTypedRecord()
if trecOpt.isSome():
trec = trecOpt.get()
if trec.secp256k1.isSome():
var skpubkey: SkPublicKey
if skpubkey.init(trec.secp256k1.get()):
let peerid = PeerID.init(PublicKey(scheme: Secp256k1,
skkey: skpubkey))
var mas = newSeq[MultiAddress]()
if trec.ip.isSome() and trec.tcp.isSome():
let ma = MultiAddress.init(multiCodec("ip4"), trec.ip.get()) &
MultiAddress.init(multiCodec("tcp"), trec.tcp.get())
mas.add(ma)
if trec.ip6.isSome() and trec.tcp6.isSome():
let ma = MultiAddress.init(multiCodec("ip6"), trec.ip6.get()) &
MultiAddress.init(multiCodec("tcp"), trec.tcp6.get())
mas.add(ma)
if trec.ip.isSome() and trec.udp.isSome():
let ma = MultiAddress.init(multiCodec("ip4"), trec.ip.get()) &
MultiAddress.init(multiCodec("udp"), trec.udp.get())
mas.add(ma)
if trec.ip6.isSome() and trec.udp6.isSome():
let ma = MultiAddress.init(multiCodec("ip6"), trec.ip6.get()) &
MultiAddress.init(multiCodec("udp"), trec.udp6.get())
mas.add(ma)
result = some(PeerInfo.init(peerid, mas))
except CatchableError as exc:
warn "Error", errMsg = exc.msg, record = enraddr.toUri()
proc connectToNetwork(switch: Switch, nodes: seq[PeerInfo],
timeout: Duration): Future[seq[PeerInfo]] {.async.} =
var pending = newSeq[Future[void]]()
var res = newSeq[PeerInfo]()
var timed, succeed, failed: int
for pinfo in nodes:
pending.add(switch.connect(pinfo))
debug "Connecting to peers", count = $len(pending), peers = nodes
if len(pending) > 0:
var timer = sleepAsync(timeout)
discard await one(timer, allFutures(pending))
for i in 0 ..< len(pending):
let fut = pending[i]
if fut.finished():
if fut.failed():
inc(failed)
warn "Unable to connect to node", address = nodes[i],
errMsg = fut.readError().msg
else:
inc(succeed)
info "Connected to node", address = nodes[i]
res.add(nodes[i])
else:
inc(timed)
fut.cancel()
warn "Connection to node timed out", address = nodes[i]
debug "Connection statistics", succeed = succeed, failed = failed,
timeout = timed, count = $len(pending)
result = res
proc connectLoop*(switch: Switch,
peerQueue: AsyncQueue[PeerInfo],
peerTable: TableRef[PeerID, PeerInfo],
timeout: Duration): Future[void] {.async.} =
var addresses = newSeq[PeerInfo]()
trace "Starting connection loop", queue_size = len(peerQueue),
table_size = len(peerTable),
timeout = timeout
while true:
if len(addresses) > 0:
addresses.setLen(0)
let ma = await peerQueue.popFirst()
addresses.add(ma)
while not(peerQueue.empty()):
addresses.add(peerQueue.popFirstNoWait())
trace "Got new peers", count = len(addresses)
var infos = await switch.connectToNetwork(addresses, timeout)
for item in infos:
peerTable[item.peerId] = item
proc toIpAddress*(ma: MultiAddress): Option[IpAddress] =
if IP4.match(ma):
let address = ma.protoAddress()
result = some(IpAddress(family: IpAddressFamily.IPv4,
address_v4: toArray(4, address)))
elif IP6.match(ma):
let address = ma.protoAddress()
result = some(IpAddress(family: IpAddressFamily.IPv6,
address_v6: toArray(16, address)))
proc bootstrapDiscovery(conf: InspectorConf,
host: MultiAddress,
privkey: lcrypto.PrivateKey,
bootnodes: seq[enr.Record],
enrFields: Option[ENRFieldPair]): DiscoveryProtocol =
var pk = ethkeys.PrivateKey.fromRaw(privkey.getBytes()).tryGet()
var db = DiscoveryDB.init(newMemoryDB())
let udpPort = Port(conf.discoveryPort)
let tcpPort = Port(conf.ethPort)
let host = host.toIpAddress()
var pairs: seq[FieldPair]
if enrFields.isSome():
let fields = enrFields.get()
pairs = @[toFieldPair("eth2", fields.eth2),
toFieldPair("attnets", fields.attnets)]
else:
pairs = @[]
result = newProtocol(pk, db, host, tcpPort, udpPort, pairs, bootnodes)
result.open()
result.start()
proc logEnrAddress(address: string) =
var
rec: enr.Record
trec: enr.TypedRecord
eth2fork_digest, eth2next_fork_version, eth2next_fork_epoch: string
attnets: string
if fromURI(rec, EnrUri(address)):
var eth2Data = rec.tryGet("eth2", seq[byte])
var attnData = rec.tryGet("attnets", seq[byte])
var optrec = rec.toTypedRecord()
if optrec.isSome():
trec = optrec.get()
if eth2Data.isSome():
try:
var forkid = SSZ.decode(eth2Data.get(), ENRForkID)
eth2fork_digest = bu.toHex(forkid.fork_digest)
eth2next_fork_version = bu.toHex(forkid.next_fork_version)
eth2next_fork_epoch = strutils.toHex(cast[uint64](forkid.next_fork_epoch))
except CatchableError:
eth2fork_digest = "Error"
eth2next_fork_version = "Error"
eth2next_fork_epoch = "Error"
else:
eth2fork_digest = "None"
eth2next_fork_version = "None"
eth2next_fork_epoch = "None"
if attnData.isSome():
var attn = SSZ.decode(attnData.get(), seq[byte])
attnets = bu.toHex(attn)
else:
attnets = "None"
info "ENR bootstrap address fileds",
enr_uri = address,
enr_id = trec.id,
secp256k1 = if trec.secp256k1.isSome():
bu.toHex(trec.secp256k1.get())
else:
"None",
ip4 = if trec.ip.isSome():
$MultiAddress.init(multiCodec("ip4"), trec.ip.get())
else:
"None",
ip6 = if trec.ip6.isSome():
$MultiAddress.init(multiCodec("ip6"), trec.ip6.get())
else:
"None",
tcp = if trec.tcp.isSome(): $trec.tcp.get() else: "None",
udp = if trec.udp.isSome(): $trec.udp.get() else: "None",
tcp6 = if trec.tcp6.isSome(): $trec.tcp6.get() else: "None",
udp6 = if trec.udp6.isSome(): $trec.udp6.get() else: "None",
eth2_fork_digest = eth2fork_digest,
eth2_next_fork_version = eth2next_fork_version,
eth2_next_fork_epoch = eth2next_fork_epoch,
eth2_attnets = attnets
else:
info "ENR bootstrap address is wrong or incomplete", enr_uri = address
else:
info "ENR bootstrap address is wrong or incomplete", enr_uri = address
proc init*(p: typedesc[PeerInfo],
enruri: EnrUri): Option[PeerInfo] {.inline.} =
var rec: enr.Record
if fromURI(rec, enruri):
logEnrAddress(rec.toUri())
result = PeerInfo.init(rec)
proc pubsubLogger(conf: InspectorConf, switch: Switch,
resolveQueue: AsyncQueue[PeerID], topic: string,
data: seq[byte]): Future[void] {.async, gcsafe.} =
info "Received pubsub message", size = len(data),
topic = topic,
message = bu.toHex(data)
var buffer: seq[byte]
if conf.decode:
if topic.endsWith("_snappy"):
try:
buffer = snappy.decode(data)
except CatchableError as exc:
warn "Unable to decompress message", errMsg = exc.msg
else:
buffer = data
try:
if topic.endsWith(topicBeaconBlocksSuffix) or
topic.endsWith(topicBeaconBlocksSuffix & "_snappy"):
info "SignedBeaconBlock", msg = SSZ.decode(buffer, SignedBeaconBlock)
elif topic.endsWith(topicAttestationsSuffix) or
topic.endsWith(topicAttestationsSuffix & "_snappy"):
info "Attestation", msg = SSZ.decode(buffer, Attestation)
elif topic.endsWith(topicVoluntaryExitsSuffix) or
topic.endsWith(topicVoluntaryExitsSuffix & "_snappy"):
info "SignedVoluntaryExit", msg = SSZ.decode(buffer,
SignedVoluntaryExit)
elif topic.endsWith(topicProposerSlashingsSuffix) or
topic.endsWith(topicProposerSlashingsSuffix & "_snappy"):
info "ProposerSlashing", msg = SSZ.decode(buffer, ProposerSlashing)
elif topic.endsWith(topicAttesterSlashingsSuffix) or
topic.endsWith(topicAttesterSlashingsSuffix & "_snappy"):
info "AttesterSlashing", msg = SSZ.decode(buffer, AttesterSlashing)
elif topic.endsWith(topicAggregateAndProofsSuffix) or
topic.endsWith(topicAggregateAndProofsSuffix & "_snappy"):
info "AggregateAndProof", msg = SSZ.decode(buffer, AggregateAndProof)
except CatchableError as exc:
info "Unable to decode message", errMsg = exc.msg
proc resolveLoop(conf: InspectorConf,
discovery: DiscoveryProtocol,
switch: Switch,
peerQueue: AsyncQueue[PeerID],
peers: TableRef[PeerID, PeerInfo]) {.async.} =
debug "Starting resolution loop"
while true:
let peerId = await peerQueue.popFirst()
let idOpt = peerId.toNodeId()
if idOpt.isSome():
try:
let nodeOpt = await discovery.resolve(idOpt.get())
if nodeOpt.isSome():
let peerOpt = PeerInfo.init(nodeOpt.get().record)
if peerOpt.isSome():
let peer = peerOpt.get()
trace "Peer resolved", peer_id = peerId,
node_id = idOpt.get(),
peer_info = peer
peers[peerId] = peer
else:
warn "Peer's record is invalid", peer_id = peerId,
node_id = idOpt.get(),
peer_record = nodeOpt.get().record
else:
trace "Node resolution returns empty answer", peer_id = peerId,
node_id = idOpt.get()
except CatchableError as exc:
warn "Node address resolution failed", errMsg = exc.msg,
peer_id = peerId,
node_id = idOpt.get()
proc discoveryLoop(conf: InspectorConf,
discovery: DiscoveryProtocol,
switch: Switch,
connQueue: AsyncQueue[PeerInfo],
peers: TableRef[PeerID, PeerInfo]) {.async.} =
debug "Starting discovery loop"
let wantedPeers = conf.maxPeers
while true:
try:
let discoveredPeers = discovery.randomNodes(wantedPeers - len(peers))
for peer in discoveredPeers:
let pinfoOpt = PeerInfo.init(peer.record)
if pinfoOpt.isSome():
let pinfo = pinfoOpt.get()
if pinfo.hasTCP():
if pinfo.id() notin switch.connections:
debug "Discovered new peer", peer = pinfo,
peers_count = len(peers)
await connQueue.addLast(pinfo)
else:
debug "Found discovery only peer", peer = pinfo
except CatchableError as exc:
debug "Error in discovery", errMsg = exc.msg
await sleepAsync(1.seconds)
proc run(conf: InspectorConf) {.async.} = proc run(conf: InspectorConf) {.async.} =
var var
bootnodes: seq[string]
api: DaemonApi
identity: PeerInfo
pubsubPeers: Table[PeerID, PeerInfo]
peerQueue: AsyncQueue[PeerID]
subs: seq[tuple[ticket: PubsubTicket, future: Future[void]]]
topics: set[TopicFilter] = {} topics: set[TopicFilter] = {}
forkDigest: Option[ForkDigest]
enrFields: Option[ENRFieldPair]
pubsubPeers = initTable[PeerID, PeerInfo]() var pubsubPeers = newTable[PeerID, PeerInfo]()
peerQueue = newAsyncQueue[PeerID]() var resolveQueue = newAsyncQueue[PeerID](10)
var connectQueue = newAsyncQueue[PeerInfo](10)
proc dumpPeers(api: DaemonAPI) {.async.} = let bootnodes = loadBootstrapNodes(conf)
while true: if len(bootnodes) == 0:
var peers = await api.listPeers() error "Not enough bootnodes to establish connection with network"
info "Connected peers information", peers_connected = len(peers) quit(1)
for item in peers:
info "Connected peer", peer = getPeerId(item.peer, conf),
addresses = item.addresses
for key, value in pubsubPeers.pairs():
info "Pubsub peer", peer = getPeerId(value.peer, conf),
addresses = value.addresses
await sleepAsync(10.seconds)
proc resolvePeers(api: DaemonAPI) {.async.} = var eth2bootnodes = newSeq[PeerInfo]()
var counter = 0 var disc5bootnodes = newSeq[enr.Record]()
while true:
var peer = await peerQueue.popFirst()
var info = await api.dhtFindPeer(peer)
inc(counter)
info "Peer resolved", peer = getPeerId(peer, conf),
addresses = info.addresses, count = counter
pubsubPeers[peer] = info
proc pubsubLogger(api: DaemonAPI, for item in bootnodes:
ticket: PubsubTicket, if item.kind == BootstrapKind.Enr:
message: PubSubMessage): Future[bool] {.async.} = logEnrAddress(item.addressRec.toUri())
# We must return ``false`` only if we are not going to continue monitoring
# of specific topic.
var sig = if len(message.signature.data) > 0:
$message.signature
else:
"<no signature>"
var key = if len(message.signature.data) > 0:
$message.key
else:
"<no public key>"
var pinfo = pubsubPeers.getOrDefault(message.peer) let pinfoOpt = PeerInfo.init(item.addressRec)
if len(pinfo.peer) == 0: if pinfoOpt.isSome():
pubsubPeers[message.peer] = PeerInfo(peer: message.peer) let pinfo = pinfoOpt.get()
peerQueue.addLastNoWait(message.peer) for ma in pinfo.addrs:
if TCP.match(ma):
eth2bootnodes.add(pinfo)
break
for ma in pinfo.addrs:
if UDP.match(ma):
disc5bootnodes.add(item.addressRec)
break
info "Received message", peerID = getPeerId(message.peer, conf), let forkOpt = tryGetForkDigest(item.addressRec)
size = len(message.data), if forkOpt.isSome():
topic = ticket.topic, if forkDigest.isSome():
seqno = bu.toHex(message.seqno), if forkDigest.get() != forkOpt.get():
signature = sig, warn "Bootstrap node address has different forkDigest",
pubkey = key, address = item.addressRec.toUri(),
mtopics = $message.topics, address_fork_digest = bu.toHex(forkOpt.get()),
message = bu.toHex(message.data), stored_fork_digest = bu.toHex(forkDigest.get())
zpeers = len(pubsubPeers) else:
forkDigest = forkOpt
if conf.decode: let enrFieldsOpt = tryGetFieldPairs(item.addressRec)
try: if enrFieldsOpt.isSome():
if ticket.topic.endsWith(topicBeaconBlocksSuffix): if enrFields.isSome():
info "SignedBeaconBlock", msg = SSZ.decode(message.data, SignedBeaconBlock) if enrFields.get() != enrFieldsOpt.get():
elif ticket.topic.endsWith(topicAttestationsSuffix): warn "Bootstrap node address has different eth2 values",
info "Attestation", msg = SSZ.decode(message.data, Attestation) address = item.addressRec.toUri(),
elif ticket.topic.endsWith(topicVoluntaryExitsSuffix): eth2_field_stored = bu.toHex(enrFields.get().eth2),
info "SignedVoluntaryExit", msg = SSZ.decode(message.data, SignedVoluntaryExit) eth2_field_address = bu.toHex(enrFieldsOpt.get().eth2)
elif ticket.topic.endsWith(topicProposerSlashingsSuffix): else:
info "ProposerSlashing", msg = SSZ.decode(message.data, ProposerSlashing) enrFields = enrFieldsOpt
elif ticket.topic.endsWith(topicAttesterSlashingsSuffix):
info "AttesterSlashing", msg = SSZ.decode(message.data, AttesterSlashing)
elif ticket.topic.endsWith(topicAggregateAndProofsSuffix):
info "AggregateAndProof", msg = SSZ.decode(message.data, AggregateAndProof)
except CatchableError as exc:
info "Unable to decode message", msg = exc.msg
result = true elif item.kind == BootstrapKind.MultiAddr:
if ETH2BN.match(item.addressMa):
eth2bootnodes.add(PeerInfo.init(item.addressMa).get())
if len(eth2bootnodes) == 0:
error "Not enough Ethereum2 bootnodes to establish connection with network"
quit(1)
if len(disc5bootnodes) == 0:
warn "Not enough DiscoveryV5 bootnodes, discovery will be disabled"
var argForkDigest = tryGetForkDigest(conf.forkDigest)
if forkDigest.isNone():
if argForkDigest.isNone():
error "forkDigest argument and bootstrap forkDigest are missing"
quit(1)
else:
forkDigest = argForkDigest
else:
if argForkDigest.isSome():
if forkDigest.isSome() != argForkDigest.isSome():
warn "forkDigest argument value is different, using argument value",
argument_fork_digest = toHex(argForkDigest.get()),
bootstrap_fork_digest = toHex(forkDigest.get())
forkDigest = argForkDigest
let seckey = lcrypto.PrivateKey.random(PKScheme.Secp256k1)
# let pubkey = seckey.getKey()
let hostAddress = tryGetMultiAddress(conf.bindAddress)
if hostAddress.isNone():
error "Bind address is incorrect MultiAddress", address = conf.bindAddress
quit(1)
var switch = newStandardSwitch(some(seckey), hostAddress.get(),
triggerSelf = true, gossip = true,
sign = false, verifySignature = false)
if len(conf.topics) > 0: if len(conf.topics) > 0:
for item in conf.topics: for item in conf.topics:
@ -231,94 +715,50 @@ proc run(conf: InspectorConf) {.async.} =
TopicFilter.Exits, TopicFilter.ProposerSlashing, TopicFilter.Exits, TopicFilter.ProposerSlashing,
TopicFilter.AttesterSlashings}) TopicFilter.AttesterSlashings})
if len(conf.bootstrapFile) > 0: proc pubsubTrampoline(topic: string,
info "Loading bootstrap nodes from file", filename = conf.bootstrapFile data: seq[byte]): Future[void] {.gcsafe.} =
var nodes = loadBootFile(conf.bootstrapFile) result = pubsubLogger(conf, switch, resolveQueue, topic, data)
for nodeString in nodes:
try:
var ma = MultiAddress.init(nodeString)
if not(IPFS.match(ma)):
warn "Incorrect bootnode address", address = nodeString
else:
bootnodes.add($ma)
except:
warn "Bootnode address is not valid MultiAddress", address = nodeString
for nodeString in conf.bootstrapNodes: discard switch.start()
try:
var ma = MultiAddress.init(nodeString)
if not(IPFS.match(ma)):
warn "Incorrect bootnode address", address = nodeString
else:
bootnodes.add($ma)
except:
warn "Bootnode address is not valid MultiAddress", address = nodeString
if len(bootnodes) == 0:
error "Not enough bootnodes to establish connection with network"
quit(1)
info InspectorIdent & " starting", bootnodes = bootnodes,
topic_filters = topics
var flags = {DHTClient, PSNoSign, WaitBootstrap}
if conf.signFlag:
flags.excl(PSNoSign)
if conf.gossipSub:
flags.incl(PSGossipSub)
else:
flags.incl(PSFloodSub)
try:
api = await newDaemonApi(flags, bootstrapNodes = bootnodes,
peersRequired = 1)
identity = await api.identity()
info InspectorIdent & " started", peerID = getPeerId(identity.peer, conf),
bound = identity.addresses,
options = flags
except CatchableError as e:
error "Could not initialize p2pd daemon",
exception = e.msg
quit(1)
var forkDigest: ForkDigest
hexToByteArray(conf.forkDigest, forkDigest)
var topicFilters = newSeq[string]()
try: try:
for filter in topics: for filter in topics:
for topic in getTopics(forkDigest, filter): for topic in getTopics(forkDigest.get(), filter):
let t = await api.pubsubSubscribe(topic, pubsubLogger) await switch.subscribe(topic, pubsubTrampoline)
info "Subscribed to topic", topic = topic topicFilters.add(topic)
subs.add((ticket: t, future: t.transp.join())) trace "Subscribed to topic", topic = topic
for filter in conf.customTopics: for filter in conf.customTopics:
let t = await api.pubsubSubscribe(filter, pubsubLogger) await switch.subscribe(filter, pubsubTrampoline)
info "Subscribed to custom topic", topic = filter topicFilters.add(filter)
subs.add((ticket: t, future: t.transp.join())) trace "Subscribed to custom topic", topic = filter
except CatchableError as e: except CatchableError as exc:
error "Could not subscribe to topics", exception = e.msg error "Could not subscribe to topics", errMsg = exc.msg
quit(1) quit(1)
# Starting DHT resolver task info InspectorIdent & " starting", topic_filters = topicFilters,
asyncCheck resolvePeers(api) eth2_bootnodes = eth2bootnodes,
# Starting peer dumper task disc5_bootnodes = disc5bootnodes
asyncCheck dumpPeers(api)
var futures = newSeq[Future[void]]() asyncCheck connectLoop(switch, connectQueue,
var delindex = 0 pubsubPeers, 10.seconds)
while true:
if len(subs) == 0: for node in eth2bootnodes:
break await connectQueue.addLast(node)
futures.setLen(0)
for item in subs: if len(disc5bootnodes) > 0:
futures.add(item.future) var proto = bootstrapDiscovery(conf, hostAddress.get(), seckey,
var fut = await one(futures) disc5bootnodes, enrFields)
for i in 0 ..< len(subs): if not(conf.noDiscovery):
if subs[i].future == fut: asyncCheck discoveryLoop(conf, proto, switch, connectQueue,
delindex = i pubsubPeers)
break
error "Subscription lost", topic = subs[delindex].ticket.topic asyncCheck resolveLoop(conf, proto, switch, resolveQueue,
subs.delete(delindex) pubsubPeers)
# We are not going to exit from this procedure
var emptyFut = newFuture[void]()
await emptyFut
when isMainModule: when isMainModule:
echo InspectorHeader echo InspectorHeader