19 networking flow
Ștefan Talpalaru edited this page 2020-03-07 23:10:34 +01:00

Execution flow of networking-related code in a beacon_node instance using the Nim libp2p implementation on Linux.


beacon_chain/beacon_node.nim:

  • BeaconNode.start()

    • waitsFor node.connectToNetwork()
  • BeaconNode.connectToNetwork()

    • awaits node.network.connectToNetwork(node.bootstrapNodes, node.bootstrapEnrs)

beacon_chain/eth2_network.nim:

  • Eth2Node.connectToNetwork(seq[ENode], seq[enr.Record])
    • for each bootstrap ENR, calls Eth2Node.addKnownPeer()
    • awaits Eth2Node.start()
    • waits 30 seconds and checks if there was at least one successful connection to a bootstrap node

beacon_chain/libp2p_backend.nim:

  • Eth2Node.addKnownPeer()

    • calls eth.p2p.discoveryv5.Protocol.addNode(enr.Record)
  • Eth2Node.start()

    • calls Eth2DiscoveryProtocol.open()
    • calls Eth2DiscoveryProtocol.start() - currently nop
    • awaits Switch.start() and stores the resulting seq[Future[void]] in Eth2Node.libp2pTransportLoops; nothing further is done with those futures
    • tracesAsyncErrors Eth2Node.runDiscoveryLoop()
  • Eth2Node.runDiscoveryLoop()

    • in an endless loop, while more peers are wanted, awaits eth.p2p.discoveryv5.Protocol.lookupRandom()
    • for each discovered peer, awaits Eth2Node.dialPeer(PeerInfo)
  • Eth2Node.dialPeer(PeerInfo)

    • awaits Switch.connect(PeerInfo)
    • awaits Peer.initializeConnection()

beacon_chain/libp2p_backends_common.nim:

  • Peer.initializeConnection()

    • calls Peer.performProtocolHandshakes()
  • Peer.performProtocolHandshakes()

    • for each ProtocolInfo instance in a global var, calls its handshake member which is a HandshakeStep proc

discovery

vendor/nim-eth/eth/p2p/discoveryv5/protocol.nim:

  • Protocol.addNode(enr.Record)

    • calls RoutingTable.addNode(Node)
  • Protocol.lookupRandom()

    • calls Protocol.lookup(NodeId) on a random UInt256
  • Protocol.lookup(NodeId)

    • calls RoutingTable.neighbours()
    • calls Protocol.lookupWorker() in a busy loop

switch

vendor/nim-libp2p/libp2p/switch.nim:

  • Switch.start()

    • for each Switch.transports and each Switch.peerInfo awaits Transport.listen(MultiAddress, Switch.start.handle)
    • (returns a seq of these futures)
    • awaits PubSub.start() on Switch.pubSub
  • Switch.start.handle(Connection)

    • awaits Switch.upgradeIncoming(Connection)
    • awaits Connection.close()
    • awaits Switch.cleanupConn(Connection)
  • Switch.upgradeIncoming(Connection)

    • creates a local MultistreamSelect instance
    • awaits MultistreamSelect.select(Connection)
    • for each protocol string key in Switch.secureManagers, calls MultistreamSelect.addHandler(proto: string, Switch.upgradeIncoming.securedHandler)
    • awaits MultistreamSelect.handle(Connection)
  • Switch.upgradeIncoming.securedHandler(Connection, proto: string)

    • awaits Secure.secure(Connection) and saves the result in sconn - this seems to be the same as conn
    • for each MuxerProvider value in Switch.muxers, calls MultistreamSelect.addHandler(MuxerProvider.codec, MuxerProvider)
    • awaits MultistreamSelect.handle(sconn: Connection)
  • Switch.connect(PeerInfo)

    • awaits Switch.internalConnect(PeerInfo)
  • Switch.internalConnect(PeerInfo)

    • if there is no Connection with the peer in Switch.connections or the existing one is closed, for each Transport in Switch.transports, for each MultiAddress in PeerInfo.addrs, awaits Transport.dial(MultiAddress), then awaits Switch.upgradeOutgoing(Connection)
    • awaits Switch.subscribeToPeer(PeerInfo)
  • Switch.upgradeOutgoing(Connection)

    • awaits Switch.mux(Connection)
  • Switch.mux(Connection)

    • awaits MultistreamSelect.select(Connection, muxers: seq[string])
    • calls MuxerProvider.newMuxer (a MuxerConstructor proc) and puts the resulting Muxer object in Switch.muxed
    • sets MuxerProvider.streamHandler to Switch.streamHandler
    • calls Muxer.newStream() to get a new Connection stream used only for identification
    • awaits Switch.indentify(Connection) on the new stream
    • awaits Connection.close() on that same stream
  • Switch.indentify(Connection)

    • awaits Identify.indentify(Connection, PeerInfo)
  • Switch.subscribeToPeer(PeerInfo)

    • awaits PubSub.subscribeToPeer(Connection)

identify

vendor/nim-libp2p/libp2p/protocols/identify.nim:

  • Identify.indentify(Connection, PeerInfo)

    • awaits Connection.readLp()
    • calls decodeMsg() on the read message and returns the IdentifyInfo
  • decodeMsg(buf: seq[byte]): IdentifyInfo

    • calls initProtoBuffer(buf)
    • calls ProtoBuffer.getValue(), ProtoBuffer.getBytes() and ProtoBuffer.getString() to decode the message

multistream

vendor/nim-libp2p/libp2p/multistream.nim:

  • MultistreamSelect.select(Connection, proto: string)

    • calls MultistreamSelect.select(Connection, @[proto])
  • MultistreamSelect.select(Connection, proto: seq[string])

    • awaits Connection.write(codec: string)
    • if there's a proto argument, it awaits Connection.writeLp() - only on the first seq item
    • awaits Connection.readLp() (without the trailing newline; this header is ignored)
    • if there's a proto, awaits Connection.readLp() - if it's the same string as the requested protocol, it returns it
    • if the response was "\n", it tries any other proto in that seq
  • MultistreamSelect.handle(Connection)

    • in a loop: awaits Connection.readLp() for a proto string and if it matches with a MultistreamSelect.handlers instance, it calls HandleHolder.protocol.handler(Connection, proto) - this handler is an LPProtoHandler proc type and it's Switch.upgradeIncoming.securedHandler() or a MuxerProvider.handler() from Switch.muxers which is usually MuxerProvider.init.handler() that does nothing

pubsub

vendor/nim-libp2p/libp2p/protocols/pubsub/pubsub.nim:

  • PubSub.subscribeToPeer(Connection)

    • calls PubSub.getPeer(Connection.peerInfo, PubSub.codec) to get a PubSubPeer and sets its "conn" attribute to the Connection param
    • registers a Connection.closeEvent callback
  • PubSub.getPeer(PeerInfo, codec: string)

    • calls newPubSubPeer(PeerInfo, codec) and caches the result in the PubSub object