nimbus-eth2/beacon_chain/libp2p_backend.nim

417 lines
14 KiB
Nim

import
algorithm, typetraits, net as stdNet,
stew/[varints,base58], stew/shims/[macros, tables], chronos, chronicles,
stint, faststreams/output_stream, serialization,
json_serialization/std/[net, options],
eth/[keys, async_utils], eth/p2p/[enode, p2p_protocol_dsl],
eth/p2p/discoveryv5/[enr, node],
# TODO: create simpler to use libp2p modules that use re-exports
libp2p/[switch, multistream, connection,
multiaddress, peerinfo, peer,
crypto/crypto, protocols/identify, protocols/protocol],
libp2p/muxers/mplex/[mplex, types],
libp2p/protocols/secure/[secure, secio],
libp2p/protocols/pubsub/[pubsub, floodsub],
libp2p/transports/[transport, tcptransport],
libp2p_json_serialization, eth2_discovery, conf, ssz,
peer_pool
import
eth/p2p/discoveryv5/protocol as discv5_protocol
export
p2pProtocol, libp2p_json_serialization, ssz
type
P2PStream = Connection
# TODO Is this really needed?
Eth2Node* = ref object of RootObj
switch*: Switch
discovery*: Eth2DiscoveryProtocol
wantedPeers*: int
peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef]
libp2pTransportLoops*: seq[Future[void]]
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
Peer* = ref object
network*: Eth2Node
info*: PeerInfo
wasDialed*: bool
discoveryId*: Eth2DiscoveryId
connectionState*: ConnectionState
protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration
score*: int
ConnectionState* = enum
None,
Connecting,
Connected,
Disconnecting,
Disconnected
UntypedResponder = object
peer*: Peer
stream*: P2PStream
Responder*[MsgType] = distinct UntypedResponder
MessageInfo* = object
name*: string
# Private fields:
libp2pCodecName: string
protocolMounter*: MounterProc
printer*: MessageContentPrinter
nextMsgResolver*: NextMsgResolver
ProtocolInfoObj* = object
name*: string
messages*: seq[MessageInfo]
index*: int # the position of the protocol in the
# ordered list of supported protocols
# Private fields:
peerStateInitializer*: PeerStateInitializer
networkStateInitializer*: NetworkStateInitializer
handshake*: HandshakeStep
disconnectHandler*: DisconnectionHandler
ProtocolInfo* = ptr ProtocolInfoObj
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
ThunkProc* = LPProtoHandler
MounterProc* = proc(network: Eth2Node) {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
DisconnectionReason* = enum
ClientShutDown
IrrelevantNetwork
FaultOrError
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason
TransmissionError* = object of CatchableError
const
TCP = net.Protocol.IPPROTO_TCP
template `$`*(peer: Peer): string = id(peer.info)
chronicles.formatIt(Peer): $it
# TODO: This exists only as a compatibility layer between the daemon
# APIs and the native LibP2P ones. It won't be necessary once the
# daemon is removed.
#
template writeAllBytes(stream: P2PStream, bytes: seq[byte]): untyped =
write(stream, bytes)
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
dial(node.switch, peer.info, protocolId)
proc peer(stream: P2PStream): PeerID =
# TODO: Can this be `nil`?
stream.peerInfo.peerId
#
# End of compatibility layer
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
let peerId = peerInfo.peerId
result = node.peerPool.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerInfo)
proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
# TODO: Can this be `nil`?
return network.getPeer(stream.peerInfo)
proc getKey*(peer: Peer): PeerID {.inline.} =
result = peer.info.peerId
proc getFuture*(peer: Peer): Future[void] {.inline.} =
result = peer.info.lifeFuture()
proc `<`*(a, b: Peer): bool =
result = `<`(a.score, b.score)
proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
await peer.network.switch.disconnect(peer.info)
peer.connectionState = Disconnected
peer.network.peerPool.release(peer)
peer.info.close()
proc safeClose(stream: P2PStream) {.async.} =
if not stream.closed:
await close(stream)
proc handleIncomingPeer*(peer: Peer)
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} =
debug "Peer (outgoing) lost", peer = $peer.info
libp2p_peers.set int64(len(network.peerPool))
let res = await network.peerPool.addOutgoingPeer(peer)
if res:
debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info
peer.getFuture().addCallback(onPeerClosed)
libp2p_peers.set int64(len(network.peerPool))
proc handleIncomingPeer*(peer: Peer) =
let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} =
debug "Peer (incoming) lost", peer = $peer.info
libp2p_peers.set int64(len(network.peerPool))
let res = network.peerPool.addIncomingPeerNoWait(peer)
if res:
debug "Peer (incoming) has been added to PeerPool", peer = $peer.info
peer.getFuture().addCallback(onPeerClosed)
libp2p_peers.set int64(len(network.peerPool))
proc toPeerInfo*(r: enr.TypedRecord): PeerInfo =
if r.secp256k1.isSome:
var pubKey: keys.PublicKey
if recoverPublicKey(r.secp256k1.get, pubKey) != EthKeysStatus.Success:
return # TODO
let peerId = PeerID.init crypto.PublicKey(scheme: Secp256k1, skkey: pubKey)
var addresses = newSeq[MultiAddress]()
if r.ip.isSome and r.tcp.isSome:
let ip = IpAddress(family: IpAddressFamily.IPv4,
address_v4: r.ip.get)
addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get)
if r.ip6.isSome:
let ip = IpAddress(family: IpAddressFamily.IPv6,
address_v6: r.ip6.get)
if r.tcp6.isSome:
addresses.add MultiAddress.init(ip, TCP, Port r.tcp6.get)
elif r.tcp.isSome:
addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get)
else:
discard
if addresses.len > 0:
return PeerInfo.init(peerId, addresses)
proc toPeerInfo(r: Option[enr.TypedRecord]): PeerInfo =
if r.isSome:
return r.get.toPeerInfo
proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
logScope: peer = $peerInfo
debug "Connecting to peer"
await node.switch.connect(peerInfo)
var peer = node.getPeer(peerInfo)
peer.wasDialed = true
debug "Initializing connection"
await initializeConnection(peer)
inc libp2p_successful_dials
debug "Network handshakes completed"
await handleOutgoingPeer(peer)
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
debug "Starting discovery loop"
while true:
let currentPeerCount = node.peerPool.len
if currentPeerCount < node.wantedPeers:
try:
let discoveredPeers =
node.discovery.randomNodes(node.wantedPeers - currentPeerCount)
debug "Discovered peers", peer = $discoveredPeers
for peer in discoveredPeers:
try:
let peerInfo = peer.record.toTypedRecord.toPeerInfo
if peerInfo != nil and peerInfo.id notin node.switch.connections:
# TODO do this in parallel
await node.dialPeer(peerInfo)
except CatchableError as err:
debug "Failed to connect to peer", peer = $peer, err = err.msg
except CatchableError as err:
debug "Failure in discovery", err = err.msg
await sleepAsync seconds(1)
proc init*(T: type Eth2Node, conf: BeaconNodeConf,
switch: Switch, ip: IpAddress, privKey: keys.PrivateKey): T =
new result
result.switch = switch
result.discovery = Eth2DiscoveryProtocol.new(conf, ip, privKey.data)
result.wantedPeers = conf.maxPeers
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
newSeq result.protocolStates, allProtocols.len
for proto in allProtocols:
if proto.networkStateInitializer != nil:
result.protocolStates[proto.index] = proto.networkStateInitializer(result)
for msg in proto.messages:
if msg.protocolMounter != nil:
msg.protocolMounter result
template addKnownPeer*(node: Eth2Node, peer: ENode|enr.Record) =
node.discovery.addNode peer
proc start*(node: Eth2Node) {.async.} =
node.discovery.open()
node.libp2pTransportLoops = await node.switch.start()
traceAsyncErrors node.runDiscoveryLoop()
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
new result
result.info = info
result.network = network
result.connectionState = Connected
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
newSeq result.protocolStates, allProtocols.len
for i in 0 ..< allProtocols.len:
let proto = allProtocols[i]
if proto.peerStateInitializer != nil:
result.protocolStates[i] = proto.peerStateInitializer(result)
proc registerMsg(protocol: ProtocolInfo,
name: string,
mounter: MounterProc,
libp2pCodecName: string,
printer: MessageContentPrinter) =
protocol.messages.add MessageInfo(name: name,
protocolMounter: mounter,
libp2pCodecName: libp2pCodecName,
printer: printer)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
Format = ident "SSZ"
Responder = bindSym "Responder"
P2PStream = bindSym "P2PStream"
OutputStream = bindSym "OutputStream"
Peer = bindSym "Peer"
Eth2Node = bindSym "Eth2Node"
messagePrinter = bindSym "messagePrinter"
milliseconds = bindSym "milliseconds"
registerMsg = bindSym "registerMsg"
initProtocol = bindSym "initProtocol"
bindSymOp = bindSym "bindSym"
errVar = ident "err"
msgVar = ident "msg"
msgBytesVar = ident "msgBytes"
networkVar = ident "network"
await = ident "await"
callUserHandler = ident "callUserHandler"
p.useRequestIds = false
p.useSingleRecordInlining = true
new result
result.PeerType = Peer
result.NetworkType = Eth2Node
result.registerProtocol = bindSym "registerProtocol"
result.setEventHandlers = bindSym "setEventHandlers"
result.SerializationFormat = Format
result.ResponderType = Responder
result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)
result.implementMsg = proc (msg: Message) =
let
protocol = msg.protocol
msgName = $msg.ident
msgNameLit = newLit msgName
MsgRecName = msg.recName
codecNameLit = getRequestProtoName(msg.procDef)
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
# Request procs need an extra param - the stream where the response
# should be written:
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
msg.initResponderCall.add streamVar
##
## Implement the Thunk:
##
## The protocol handlers in nim-libp2p receive only a `P2PStream`
## parameter and there is no way to access the wider context (such
## as the current `Switch`). In our handlers, we may need to list all
## peers in the current network, so we must keep a reference to the
## network object in the closure environment of the installed handlers.
##
## For this reason, we define a `protocol mounter` proc that will
## initialize the network object by creating handlers bound to the
## specific network.
##
let
protocolMounterName = ident(msgName & "_mounter")
userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar])
var mounter: NimNode
if msg.userHandler != nil:
protocol.outRecvProcs.add quote do:
template `callUserHandler`(`peerVar`: `Peer`,
`streamVar`: `P2PStream`,
`msgVar`: `MsgRecName`): untyped =
`userHandlerCall`
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
proc thunk(`streamVar`: `P2PStream`,
proto: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`,
`MsgRecName`, `Format`)
mount `networkVar`.switch,
LPProtocol(codec: `codecNameLit`, handler: thunk)
mounter = protocolMounterName
else:
mounter = newNilLit()
##
## Implement Senders and Handshake
##
if msg.kind == msgHandshake:
macros.error "Handshake messages are not supported in LibP2P protocols"
else:
var sendProc = msg.createSendProc()
implementSendProcBody sendProc
protocol.outProcRegistrations.add(
newCall(registerMsg,
protocol.protocolInfoVar,
msgNameLit,
mounter,
codecNameLit,
newTree(nnkBracketExpr, messagePrinter, MsgRecName)))
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)