Table of Contents
Execution flow of networking-related code in a beacon_node instance using the Nim libp2p implementation on Linux.
beacon_chain/beacon_node.nim:
-
BeaconNode.start()
- waitsFor node.connectToNetwork()
-
BeaconNode.connectToNetwork()
- awaits node.network.connectToNetwork(node.bootstrapNodes, node.bootstrapEnrs)
beacon_chain/eth2_network.nim:
- Eth2Node.connectToNetwork(seq[ENode], seq[enr.Record])
- for each bootstrap ENR, calls Eth2Node.addKnownPeer()
- awaits Eth2Node.start()
- waits 30 seconds and checks if there was at least one successful connection to a bootstrap node
beacon_chain/libp2p_backend.nim:
-
Eth2Node.addKnownPeer()
- calls eth.p2p.discoveryv5.Protocol.addNode(enr.Record)
-
Eth2Node.start()
- calls Eth2DiscoveryProtocol.open()
- calls Eth2DiscoveryProtocol.start() - currently nop
- awaits Switch.start() and stores the resulting seq[Future[void]] in Eth2Node.libp2pTransportLoops; nothing further is done with those futures
- tracesAsyncErrors Eth2Node.runDiscoveryLoop()
-
Eth2Node.runDiscoveryLoop()
- in an endless loop, while more peers are wanted, awaits eth.p2p.discoveryv5.Protocol.lookupRandom()
- for each discovered peer, awaits Eth2Node.dialPeer(PeerInfo)
-
Eth2Node.dialPeer(PeerInfo)
- awaits Switch.connect(PeerInfo)
- awaits Peer.initializeConnection()
beacon_chain/libp2p_backends_common.nim:
-
Peer.initializeConnection()
- calls Peer.performProtocolHandshakes()
-
Peer.performProtocolHandshakes()
- for each ProtocolInfo instance in a global var, calls its
handshake
member which is a HandshakeStep proc
- for each ProtocolInfo instance in a global var, calls its
discovery
vendor/nim-eth/eth/p2p/discoveryv5/protocol.nim:
-
Protocol.addNode(enr.Record)
- calls RoutingTable.addNode(Node)
-
Protocol.lookupRandom()
- calls Protocol.lookup(NodeId) on a random UInt256
-
Protocol.lookup(NodeId)
- calls RoutingTable.neighbours()
- calls Protocol.lookupWorker() in a busy loop
switch
vendor/nim-libp2p/libp2p/switch.nim:
-
Switch.start()
- for each Switch.transports and each Switch.peerInfo awaits Transport.listen(MultiAddress, Switch.start.handle)
- (returns a seq of these futures)
- awaits PubSub.start() on Switch.pubSub
-
Switch.start.handle(Connection)
- awaits Switch.upgradeIncoming(Connection)
- awaits Connection.close()
- awaits Switch.cleanupConn(Connection)
-
Switch.upgradeIncoming(Connection)
- creates a local MultistreamSelect instance
- awaits MultistreamSelect.select(Connection)
- for each protocol string key in Switch.secureManagers, calls MultistreamSelect.addHandler(proto: string, Switch.upgradeIncoming.securedHandler)
- awaits MultistreamSelect.handle(Connection)
-
Switch.upgradeIncoming.securedHandler(Connection, proto: string)
- awaits Secure.secure(Connection) and saves the result in
sconn
- this seems to be the same asconn
- for each MuxerProvider value in Switch.muxers, calls MultistreamSelect.addHandler(MuxerProvider.codec, MuxerProvider)
- awaits MultistreamSelect.handle(sconn: Connection)
- awaits Secure.secure(Connection) and saves the result in
-
Switch.connect(PeerInfo)
- awaits Switch.internalConnect(PeerInfo)
-
Switch.internalConnect(PeerInfo)
- if there is no Connection with the peer in Switch.connections or the existing one is closed, for each Transport in Switch.transports, for each MultiAddress in PeerInfo.addrs, awaits Transport.dial(MultiAddress), then awaits Switch.upgradeOutgoing(Connection)
- awaits Switch.subscribeToPeer(PeerInfo)
-
Switch.upgradeOutgoing(Connection)
- awaits Switch.mux(Connection)
-
Switch.mux(Connection)
- awaits MultistreamSelect.select(Connection, muxers: seq[string])
- calls MuxerProvider.newMuxer (a MuxerConstructor proc) and puts the resulting Muxer object in Switch.muxed
- sets MuxerProvider.streamHandler to Switch.streamHandler
- calls Muxer.newStream() to get a new Connection stream used only for identification
- awaits Switch.indentify(Connection) on the new stream
- awaits Connection.close() on that same stream
-
Switch.indentify(Connection)
- awaits Identify.indentify(Connection, PeerInfo)
-
Switch.subscribeToPeer(PeerInfo)
- awaits PubSub.subscribeToPeer(Connection)
identify
vendor/nim-libp2p/libp2p/protocols/identify.nim:
-
Identify.indentify(Connection, PeerInfo)
- awaits Connection.readLp()
- calls decodeMsg() on the read message and returns the IdentifyInfo
-
decodeMsg(buf: seq[byte]): IdentifyInfo
- calls initProtoBuffer(buf)
- calls ProtoBuffer.getValue(), ProtoBuffer.getBytes() and ProtoBuffer.getString() to decode the message
multistream
vendor/nim-libp2p/libp2p/multistream.nim:
-
MultistreamSelect.select(Connection, proto: string)
- calls MultistreamSelect.select(Connection, @[proto])
-
MultistreamSelect.select(Connection, proto: seq[string])
- awaits Connection.write(codec: string)
- if there's a
proto
argument, it awaits Connection.writeLp() - only on the first seq item - awaits Connection.readLp() (without the trailing newline; this header is ignored)
- if there's a
proto
, awaits Connection.readLp() - if it's the same string as the requested protocol, it returns it - if the response was "\n", it tries any other proto in that seq
-
MultistreamSelect.handle(Connection)
- in a loop: awaits Connection.readLp() for a proto string and if it matches with a MultistreamSelect.handlers instance, it calls HandleHolder.protocol.handler(Connection, proto) - this handler is an LPProtoHandler proc type and it's Switch.upgradeIncoming.securedHandler() or a MuxerProvider.handler() from Switch.muxers which is usually MuxerProvider.init.handler() that does nothing
pubsub
vendor/nim-libp2p/libp2p/protocols/pubsub/pubsub.nim:
-
PubSub.subscribeToPeer(Connection)
- calls PubSub.getPeer(Connection.peerInfo, PubSub.codec) to get a PubSubPeer and sets its "conn" attribute to the Connection param
- registers a Connection.closeEvent callback
-
PubSub.getPeer(PeerInfo, codec: string)
- calls newPubSubPeer(PeerInfo, codec) and caches the result in the PubSub object