Merge libp2p_backend into eth2_network
This commit is contained in:
parent
0c018cb68a
commit
d5e4e640b4
|
@ -1,20 +1,141 @@
|
|||
import
|
||||
options, tables, strutils, sequtils,
|
||||
json_serialization, json_serialization/std/net,
|
||||
metrics, chronos, chronicles, metrics, libp2p/crypto/crypto,
|
||||
eth/keys, eth/p2p/enode, eth/net/nat, eth/p2p/discoveryv5/enr,
|
||||
eth2_discovery, version, conf
|
||||
# Std lib
|
||||
typetraits, strutils, os, random, algorithm,
|
||||
options as stdOptions, net as stdNet,
|
||||
|
||||
# Status libs
|
||||
stew/[io, varints, base58], stew/shims/[macros, tables], stint,
|
||||
faststreams/output_stream,
|
||||
json_serialization, json_serialization/std/[net, options],
|
||||
chronos, chronicles, metrics,
|
||||
# TODO: create simpler to use libp2p modules that use re-exports
|
||||
libp2p/[switch, standard_setup, peerinfo, peer, connection,
|
||||
multiaddress, multicodec, crypto/crypto,
|
||||
protocols/identify, protocols/protocol],
|
||||
libp2p/protocols/secure/[secure, secio],
|
||||
libp2p/protocols/pubsub/[pubsub, floodsub],
|
||||
libp2p/transports/[transport, tcptransport],
|
||||
eth/[keys, async_utils], eth/p2p/[enode, p2p_protocol_dsl],
|
||||
eth/net/nat, eth/p2p/discoveryv5/[enr, node],
|
||||
|
||||
# Beacon node modules
|
||||
version, conf, eth2_discovery, libp2p_json_serialization, conf, ssz,
|
||||
peer_pool
|
||||
|
||||
import
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol
|
||||
|
||||
export
|
||||
version, multiaddress, peer_pool, peerinfo, p2pProtocol,
|
||||
libp2p_json_serialization, ssz
|
||||
|
||||
logScope:
|
||||
topics = "networking"
|
||||
|
||||
type
|
||||
KeyPair* = crypto.KeyPair
|
||||
PublicKey* = crypto.PublicKey
|
||||
PrivateKey* = crypto.PrivateKey
|
||||
|
||||
Bytes = seq[byte]
|
||||
P2PStream = Connection
|
||||
|
||||
# TODO Is this really needed?
|
||||
Eth2Node* = ref object of RootObj
|
||||
switch*: Switch
|
||||
discovery*: Eth2DiscoveryProtocol
|
||||
wantedPeers*: int
|
||||
peerPool*: PeerPool[Peer, PeerID]
|
||||
protocolStates*: seq[RootRef]
|
||||
libp2pTransportLoops*: seq[Future[void]]
|
||||
|
||||
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
|
||||
|
||||
Peer* = ref object
|
||||
network*: Eth2Node
|
||||
info*: PeerInfo
|
||||
wasDialed*: bool
|
||||
discoveryId*: Eth2DiscoveryId
|
||||
connectionState*: ConnectionState
|
||||
protocolStates*: seq[RootRef]
|
||||
maxInactivityAllowed*: Duration
|
||||
score*: int
|
||||
|
||||
ConnectionState* = enum
|
||||
None,
|
||||
Connecting,
|
||||
Connected,
|
||||
Disconnecting,
|
||||
Disconnected
|
||||
|
||||
UntypedResponder = object
|
||||
peer*: Peer
|
||||
stream*: P2PStream
|
||||
|
||||
Responder*[MsgType] = distinct UntypedResponder
|
||||
|
||||
MessageInfo* = object
|
||||
name*: string
|
||||
|
||||
# Private fields:
|
||||
libp2pCodecName: string
|
||||
protocolMounter*: MounterProc
|
||||
printer*: MessageContentPrinter
|
||||
nextMsgResolver*: NextMsgResolver
|
||||
|
||||
ProtocolInfoObj* = object
|
||||
name*: string
|
||||
messages*: seq[MessageInfo]
|
||||
index*: int # the position of the protocol in the
|
||||
# ordered list of supported protocols
|
||||
|
||||
# Private fields:
|
||||
peerStateInitializer*: PeerStateInitializer
|
||||
networkStateInitializer*: NetworkStateInitializer
|
||||
handshake*: HandshakeStep
|
||||
disconnectHandler*: DisconnectionHandler
|
||||
|
||||
ProtocolInfo* = ptr ProtocolInfoObj
|
||||
|
||||
ResponseCode* = enum
|
||||
Success
|
||||
InvalidRequest
|
||||
ServerError
|
||||
|
||||
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
|
||||
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
|
||||
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
|
||||
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
|
||||
ThunkProc* = LPProtoHandler
|
||||
MounterProc* = proc(network: Eth2Node) {.gcsafe.}
|
||||
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
|
||||
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
|
||||
|
||||
DisconnectionReason* = enum
|
||||
ClientShutDown
|
||||
IrrelevantNetwork
|
||||
FaultOrError
|
||||
|
||||
PeerDisconnected* = object of CatchableError
|
||||
reason*: DisconnectionReason
|
||||
|
||||
TransmissionError* = object of CatchableError
|
||||
|
||||
const
|
||||
clientId* = "Nimbus beacon node v" & fullVersionStr
|
||||
networkKeyFilename = "privkey.protobuf"
|
||||
|
||||
export
|
||||
version
|
||||
TCP = net.Protocol.IPPROTO_TCP
|
||||
HandshakeTimeout = FaultOrError
|
||||
|
||||
# Spec constants
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains
|
||||
REQ_RESP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
|
||||
GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
|
||||
TTFB_TIMEOUT* = 5.seconds
|
||||
RESP_TIMEOUT* = 10.seconds
|
||||
|
||||
readTimeoutErrorMsg = "Exceeded read timeout for a request"
|
||||
|
||||
let
|
||||
globalListeningAddr = parseIpAddress("0.0.0.0")
|
||||
|
@ -26,13 +147,702 @@ declareCounter gossip_messages_sent,
|
|||
declareCounter gossip_messages_received,
|
||||
"Number of gossip messages received by this peer"
|
||||
|
||||
declarePublicGauge libp2p_successful_dials,
|
||||
"Number of successfully dialed peers"
|
||||
|
||||
declarePublicGauge libp2p_peers,
|
||||
"Number of active libp2p peers"
|
||||
|
||||
template libp2pProtocol*(name: string, version: int) {.pragma.}
|
||||
|
||||
template `$`*(peer: Peer): string = id(peer.info)
|
||||
chronicles.formatIt(Peer): $it
|
||||
|
||||
template remote*(peer: Peer): untyped =
|
||||
peer.info.peerId
|
||||
|
||||
# TODO: This exists only as a compatibility layer between the daemon
|
||||
# APIs and the native LibP2P ones. It won't be necessary once the
|
||||
# daemon is removed.
|
||||
#
|
||||
template writeAllBytes(stream: P2PStream, bytes: seq[byte]): untyped =
|
||||
write(stream, bytes)
|
||||
|
||||
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
|
||||
dial(node.switch, peer.info, protocolId)
|
||||
|
||||
proc peer(stream: P2PStream): PeerID =
|
||||
# TODO: Can this be `nil`?
|
||||
stream.peerInfo.peerId
|
||||
#
|
||||
# End of compatibility layer
|
||||
|
||||
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
|
||||
|
||||
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
|
||||
let peerId = peerInfo.peerId
|
||||
result = node.peerPool.getOrDefault(peerId)
|
||||
if result == nil:
|
||||
result = Peer.init(node, peerInfo)
|
||||
|
||||
proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
|
||||
# TODO: Can this be `nil`?
|
||||
return network.getPeer(stream.peerInfo)
|
||||
|
||||
proc getKey*(peer: Peer): PeerID {.inline.} =
|
||||
result = peer.info.peerId
|
||||
|
||||
proc getFuture*(peer: Peer): Future[void] {.inline.} =
|
||||
result = peer.info.lifeFuture()
|
||||
|
||||
proc `<`*(a, b: Peer): bool =
|
||||
result = `<`(a.score, b.score)
|
||||
|
||||
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||
notifyOtherPeer = false) {.async.} =
|
||||
# TODO: How should we notify the other peer?
|
||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
peer.connectionState = Disconnecting
|
||||
await peer.network.switch.disconnect(peer.info)
|
||||
peer.connectionState = Disconnected
|
||||
peer.network.peerPool.release(peer)
|
||||
peer.info.close()
|
||||
|
||||
proc safeClose(stream: P2PStream) {.async.} =
|
||||
if not stream.closed:
|
||||
await close(stream)
|
||||
|
||||
proc handleIncomingPeer*(peer: Peer)
|
||||
|
||||
include eth/p2p/p2p_backends_helpers
|
||||
include eth/p2p/p2p_tracing
|
||||
|
||||
proc getRequestProtoName(fn: NimNode): NimNode =
|
||||
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
|
||||
# (TODO: file as an issue)
|
||||
|
||||
let pragmas = fn.pragma
|
||||
if pragmas.kind == nnkPragma and pragmas.len > 0:
|
||||
for pragma in pragmas:
|
||||
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
|
||||
let protoName = $(pragma[1])
|
||||
let protoVer = $(pragma[2].intVal)
|
||||
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/ssz")
|
||||
|
||||
return newLit("")
|
||||
|
||||
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
|
||||
var e = newException(PeerDisconnected, msg)
|
||||
e.reason = r
|
||||
raise e
|
||||
|
||||
proc disconnectAndRaise(peer: Peer,
|
||||
reason: DisconnectionReason,
|
||||
msg: string) {.async.} =
|
||||
let r = reason
|
||||
await peer.disconnect(r)
|
||||
raisePeerDisconnected(msg, r)
|
||||
|
||||
proc readChunk(stream: P2PStream,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
|
||||
|
||||
proc readSizePrefix(stream: P2PStream,
|
||||
deadline: Future[void]): Future[int] {.async.} =
|
||||
trace "about to read msg size prefix"
|
||||
var parser: VarintParser[uint64, ProtoBuf]
|
||||
while true:
|
||||
var nextByte: byte
|
||||
var readNextByte = stream.readExactly(addr nextByte, 1)
|
||||
await readNextByte or deadline
|
||||
if not readNextByte.finished:
|
||||
trace "size prefix byte not received in time"
|
||||
return -1
|
||||
case parser.feedByte(nextByte)
|
||||
of Done:
|
||||
let res = parser.getResult
|
||||
if res > uint64(REQ_RESP_MAX_SIZE):
|
||||
trace "size prefix outside of range", res
|
||||
return -1
|
||||
else:
|
||||
trace "got size prefix", res
|
||||
return int(res)
|
||||
of Overflow:
|
||||
trace "size prefix overflow"
|
||||
return -1
|
||||
of Incomplete:
|
||||
continue
|
||||
|
||||
proc readMsgBytes(stream: P2PStream,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Bytes] {.async.} =
|
||||
trace "about to read message bytes", withResponseCode
|
||||
|
||||
try:
|
||||
if withResponseCode:
|
||||
var responseCode: byte
|
||||
trace "about to read response code"
|
||||
var readResponseCode = stream.readExactly(addr responseCode, 1)
|
||||
await readResponseCode or deadline
|
||||
|
||||
if not readResponseCode.finished:
|
||||
trace "response code not received in time"
|
||||
return
|
||||
|
||||
if responseCode > ResponseCode.high.byte:
|
||||
trace "invalid response code", responseCode
|
||||
return
|
||||
|
||||
logScope: responseCode = ResponseCode(responseCode)
|
||||
trace "got response code"
|
||||
|
||||
case ResponseCode(responseCode)
|
||||
of InvalidRequest, ServerError:
|
||||
let responseErrMsg = await readChunk(stream, string, false, deadline)
|
||||
debug "P2P request resulted in error", responseErrMsg
|
||||
return
|
||||
|
||||
of Success:
|
||||
# The response is OK, the execution continues below
|
||||
discard
|
||||
|
||||
var sizePrefix = await readSizePrefix(stream, deadline)
|
||||
trace "got msg size prefix", sizePrefix
|
||||
|
||||
if sizePrefix == -1:
|
||||
debug "Failed to read an incoming message size prefix", peer = stream.peer
|
||||
return
|
||||
|
||||
if sizePrefix == 0:
|
||||
debug "Received SSZ with zero size", peer = stream.peer
|
||||
return
|
||||
|
||||
trace "about to read msg bytes", len = sizePrefix
|
||||
var msgBytes = newSeq[byte](sizePrefix)
|
||||
var readBody = stream.readExactly(addr msgBytes[0], sizePrefix)
|
||||
await readBody or deadline
|
||||
if not readBody.finished:
|
||||
trace "msg bytes not received in time"
|
||||
return
|
||||
|
||||
trace "got message bytes", len = sizePrefix
|
||||
return msgBytes
|
||||
|
||||
except TransportIncompleteError:
|
||||
return @[]
|
||||
|
||||
proc readChunk(stream: P2PStream,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
|
||||
try:
|
||||
if msgBytes.len > 0:
|
||||
return some SSZ.decode(msgBytes, MsgType)
|
||||
except SerializationError as err:
|
||||
debug "Failed to decode a network message",
|
||||
msgBytes, errMsg = err.formatMsg("<msg>")
|
||||
return
|
||||
|
||||
proc readResponse(
|
||||
stream: P2PStream,
|
||||
MsgType: type,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
|
||||
when MsgType is seq:
|
||||
type E = ElemType(MsgType)
|
||||
var results: MsgType
|
||||
while true:
|
||||
let nextRes = await readChunk(stream, E, true, deadline)
|
||||
if nextRes.isNone: break
|
||||
results.add nextRes.get
|
||||
if results.len > 0:
|
||||
return some(results)
|
||||
else:
|
||||
return await readChunk(stream, MsgType, true, deadline)
|
||||
|
||||
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
|
||||
var s = init OutputStream
|
||||
s.append byte(responseCode)
|
||||
s.appendVarint errMsg.len
|
||||
s.appendValue SSZ, errMsg
|
||||
s.getOutput
|
||||
|
||||
proc sendErrorResponse(peer: Peer,
|
||||
stream: P2PStream,
|
||||
err: ref SerializationError,
|
||||
msgName: string,
|
||||
msgBytes: Bytes) {.async.} =
|
||||
debug "Received an invalid request",
|
||||
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
|
||||
|
||||
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
|
||||
await stream.writeAllBytes(responseBytes)
|
||||
await stream.close()
|
||||
|
||||
proc sendErrorResponse(peer: Peer,
|
||||
stream: P2PStream,
|
||||
responseCode: ResponseCode,
|
||||
errMsg: string) {.async.} =
|
||||
debug "Error processing request", peer, responseCode, errMsg
|
||||
|
||||
let responseBytes = encodeErrorMsg(ServerError, errMsg)
|
||||
await stream.writeAllBytes(responseBytes)
|
||||
await stream.close()
|
||||
|
||||
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
|
||||
var deadline = sleepAsync RESP_TIMEOUT
|
||||
var streamFut = peer.network.openStream(peer, protocolId)
|
||||
await streamFut or deadline
|
||||
if not streamFut.finished:
|
||||
# TODO: we are returning here because the deadline passed, but
|
||||
# the stream can still be opened eventually a bit later. Who is
|
||||
# going to close it then?
|
||||
raise newException(TransmissionError, "Failed to open LibP2P stream")
|
||||
|
||||
let stream = streamFut.read
|
||||
defer:
|
||||
await safeClose(stream)
|
||||
|
||||
var s = init OutputStream
|
||||
s.appendVarint requestBytes.len.uint64
|
||||
s.append requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.writeAllBytes(bytes)
|
||||
|
||||
# TODO There is too much duplication in the responder functions, but
|
||||
# I hope to reduce this when I increse the reliance on output streams.
|
||||
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
||||
var s = init OutputStream
|
||||
s.append byte(Success)
|
||||
s.appendVarint payload.len.uint64
|
||||
s.append payload
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.writeAllBytes(bytes)
|
||||
|
||||
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
|
||||
var s = init OutputStream
|
||||
s.append byte(Success)
|
||||
s.appendValue SSZ, sizePrefixed(val)
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.writeAllBytes(bytes)
|
||||
|
||||
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
|
||||
var s = init OutputStream
|
||||
for chunk in chunks:
|
||||
s.append byte(Success)
|
||||
s.appendValue SSZ, sizePrefixed(chunk)
|
||||
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.writeAllBytes(bytes)
|
||||
|
||||
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||
ResponseMsg: type,
|
||||
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
|
||||
var deadline = sleepAsync timeout
|
||||
|
||||
# Open a new LibP2P stream
|
||||
var streamFut = peer.network.openStream(peer, protocolId)
|
||||
await streamFut or deadline
|
||||
if not streamFut.finished:
|
||||
# TODO: we are returning here because the deadline passed, but
|
||||
# the stream can still be opened eventually a bit later. Who is
|
||||
# going to close it then?
|
||||
return none(ResponseMsg)
|
||||
|
||||
let stream = streamFut.read
|
||||
defer:
|
||||
await safeClose(stream)
|
||||
|
||||
# Send the request
|
||||
var s = init OutputStream
|
||||
s.appendVarint requestBytes.len.uint64
|
||||
s.append requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.writeAllBytes(bytes)
|
||||
|
||||
# Read the response
|
||||
return await stream.readResponse(ResponseMsg, deadline)
|
||||
|
||||
proc init*[MsgType](T: type Responder[MsgType],
|
||||
peer: Peer, stream: P2PStream): T =
|
||||
T(UntypedResponder(peer: peer, stream: stream))
|
||||
|
||||
template write*[M](r: var Responder[M], val: auto): auto =
|
||||
mixin send
|
||||
type Msg = M
|
||||
type MsgRec = RecType(Msg)
|
||||
when MsgRec is seq|openarray:
|
||||
type E = ElemType(MsgRec)
|
||||
when val is E:
|
||||
sendResponseChunkObj(UntypedResponder(r), val)
|
||||
elif val is MsgRec:
|
||||
sendResponseChunks(UntypedResponder(r), val)
|
||||
else:
|
||||
{.fatal: "Unepected message type".}
|
||||
else:
|
||||
send(r, val)
|
||||
|
||||
proc performProtocolHandshakes*(peer: Peer) {.async.} =
|
||||
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
|
||||
for protocol in allProtocols:
|
||||
if protocol.handshake != nil:
|
||||
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
|
||||
|
||||
await all(subProtocolsHandshakes)
|
||||
|
||||
template initializeConnection*(peer: Peer): auto =
|
||||
performProtocolHandshakes(peer)
|
||||
|
||||
proc initProtocol(name: string,
|
||||
peerInit: PeerStateInitializer,
|
||||
networkInit: NetworkStateInitializer): ProtocolInfoObj =
|
||||
result.name = name
|
||||
result.messages = @[]
|
||||
result.peerStateInitializer = peerInit
|
||||
result.networkStateInitializer = networkInit
|
||||
|
||||
proc registerProtocol(protocol: ProtocolInfo) =
|
||||
# TODO: This can be done at compile-time in the future
|
||||
let pos = lowerBound(gProtocols, protocol)
|
||||
gProtocols.insert(protocol, pos)
|
||||
for i in 0 ..< gProtocols.len:
|
||||
gProtocols[i].index = i
|
||||
|
||||
proc setEventHandlers(p: ProtocolInfo,
|
||||
handshake: HandshakeStep,
|
||||
disconnectHandler: DisconnectionHandler) =
|
||||
p.handshake = handshake
|
||||
p.disconnectHandler = disconnectHandler
|
||||
|
||||
proc implementSendProcBody(sendProc: SendProc) =
|
||||
let
|
||||
msg = sendProc.msg
|
||||
UntypedResponder = bindSym "UntypedResponder"
|
||||
|
||||
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
|
||||
if msg.kind != msgResponse:
|
||||
let msgProto = getRequestProtoName(msg.procDef)
|
||||
case msg.kind
|
||||
of msgRequest:
|
||||
let
|
||||
timeout = msg.timeoutParam[0]
|
||||
ResponseRecord = msg.response.recName
|
||||
quote:
|
||||
makeEth2Request(`peer`, `msgProto`, `bytes`,
|
||||
`ResponseRecord`, `timeout`)
|
||||
else:
|
||||
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
|
||||
else:
|
||||
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`)
|
||||
|
||||
sendProc.useStandardBody(nil, nil, sendCallGenerator)
|
||||
|
||||
proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
|
||||
MsgType, Format: distinct type) {.async, gcsafe.} =
|
||||
mixin callUserHandler, RecType
|
||||
const msgName = typetraits.name(MsgType)
|
||||
|
||||
## Uncomment this to enable tracing on all incoming requests
|
||||
## You can include `msgNameLit` in the condition to select
|
||||
## more specific requests:
|
||||
# when chronicles.runtimeFilteringEnabled:
|
||||
# setLogLevel(LogLevel.TRACE)
|
||||
# defer: setLogLevel(LogLevel.DEBUG)
|
||||
# trace "incoming " & `msgNameLit` & " stream"
|
||||
|
||||
let peer = peerFromStream(network, stream)
|
||||
|
||||
handleIncomingPeer(peer)
|
||||
|
||||
defer:
|
||||
await safeClose(stream)
|
||||
|
||||
let
|
||||
deadline = sleepAsync RESP_TIMEOUT
|
||||
msgBytes = await readMsgBytes(stream, false, deadline)
|
||||
|
||||
if msgBytes.len == 0:
|
||||
await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg)
|
||||
return
|
||||
|
||||
type MsgRec = RecType(MsgType)
|
||||
var msg: MsgRec
|
||||
try:
|
||||
msg = decode(Format, msgBytes, MsgRec)
|
||||
except SerializationError as err:
|
||||
await sendErrorResponse(peer, stream, err, msgName, msgBytes)
|
||||
return
|
||||
except Exception as err:
|
||||
# TODO. This is temporary code that should be removed after interop.
|
||||
# It can be enabled only in certain diagnostic builds where it should
|
||||
# re-raise the exception.
|
||||
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
|
||||
await sendErrorResponse(peer, stream, ServerError, err.msg)
|
||||
raise err
|
||||
|
||||
try:
|
||||
logReceivedMsg(peer, MsgType(msg))
|
||||
await callUserHandler(peer, stream, msg)
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, stream, ServerError, err.msg)
|
||||
|
||||
proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
|
||||
let network = peer.network
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
debug "Peer (outgoing) lost", peer = $peer.info
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
let res = await network.peerPool.addOutgoingPeer(peer)
|
||||
if res:
|
||||
debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc handleIncomingPeer*(peer: Peer) =
|
||||
let network = peer.network
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
debug "Peer (incoming) lost", peer = $peer.info
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
let res = network.peerPool.addIncomingPeerNoWait(peer)
|
||||
if res:
|
||||
debug "Peer (incoming) has been added to PeerPool", peer = $peer.info
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc toPeerInfo*(r: enr.TypedRecord): PeerInfo =
|
||||
if r.secp256k1.isSome:
|
||||
var pubKey: keys.PublicKey
|
||||
if recoverPublicKey(r.secp256k1.get, pubKey) != EthKeysStatus.Success:
|
||||
return # TODO
|
||||
|
||||
let peerId = PeerID.init crypto.PublicKey(scheme: Secp256k1, skkey: pubKey)
|
||||
var addresses = newSeq[MultiAddress]()
|
||||
|
||||
if r.ip.isSome and r.tcp.isSome:
|
||||
let ip = IpAddress(family: IpAddressFamily.IPv4,
|
||||
address_v4: r.ip.get)
|
||||
addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get)
|
||||
|
||||
if r.ip6.isSome:
|
||||
let ip = IpAddress(family: IpAddressFamily.IPv6,
|
||||
address_v6: r.ip6.get)
|
||||
if r.tcp6.isSome:
|
||||
addresses.add MultiAddress.init(ip, TCP, Port r.tcp6.get)
|
||||
elif r.tcp.isSome:
|
||||
addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get)
|
||||
else:
|
||||
discard
|
||||
|
||||
if addresses.len > 0:
|
||||
return PeerInfo.init(peerId, addresses)
|
||||
|
||||
proc toPeerInfo(r: Option[enr.TypedRecord]): PeerInfo =
|
||||
if r.isSome:
|
||||
return r.get.toPeerInfo
|
||||
|
||||
proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
|
||||
logScope: peer = $peerInfo
|
||||
|
||||
debug "Connecting to peer"
|
||||
await node.switch.connect(peerInfo)
|
||||
var peer = node.getPeer(peerInfo)
|
||||
peer.wasDialed = true
|
||||
|
||||
debug "Initializing connection"
|
||||
await initializeConnection(peer)
|
||||
|
||||
inc libp2p_successful_dials
|
||||
debug "Network handshakes completed"
|
||||
|
||||
await handleOutgoingPeer(peer)
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
debug "Starting discovery loop"
|
||||
|
||||
while true:
|
||||
let currentPeerCount = node.peerPool.len
|
||||
if currentPeerCount < node.wantedPeers:
|
||||
try:
|
||||
let discoveredPeers =
|
||||
node.discovery.randomNodes(node.wantedPeers - currentPeerCount)
|
||||
debug "Discovered peers", peer = $discoveredPeers
|
||||
for peer in discoveredPeers:
|
||||
try:
|
||||
let peerInfo = peer.record.toTypedRecord.toPeerInfo
|
||||
if peerInfo != nil and peerInfo.id notin node.switch.connections:
|
||||
# TODO do this in parallel
|
||||
await node.dialPeer(peerInfo)
|
||||
except CatchableError as err:
|
||||
debug "Failed to connect to peer", peer = $peer, err = err.msg
|
||||
except CatchableError as err:
|
||||
debug "Failure in discovery", err = err.msg
|
||||
|
||||
await sleepAsync seconds(1)
|
||||
|
||||
proc init*(T: type Eth2Node, conf: BeaconNodeConf,
|
||||
switch: Switch, ip: IpAddress, privKey: keys.PrivateKey): T =
|
||||
new result
|
||||
result.switch = switch
|
||||
result.discovery = Eth2DiscoveryProtocol.new(conf, ip, privKey.data)
|
||||
result.wantedPeers = conf.maxPeers
|
||||
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
|
||||
|
||||
newSeq result.protocolStates, allProtocols.len
|
||||
for proto in allProtocols:
|
||||
if proto.networkStateInitializer != nil:
|
||||
result.protocolStates[proto.index] = proto.networkStateInitializer(result)
|
||||
|
||||
for msg in proto.messages:
|
||||
if msg.protocolMounter != nil:
|
||||
msg.protocolMounter result
|
||||
|
||||
template publicKey*(node: Eth2Node): keys.PublicKey =
|
||||
node.discovery.privKey.getPublicKey
|
||||
|
||||
template addKnownPeer*(node: Eth2Node, peer: ENode|enr.Record) =
|
||||
node.discovery.addNode peer
|
||||
|
||||
proc start*(node: Eth2Node) {.async.} =
|
||||
node.discovery.open()
|
||||
node.libp2pTransportLoops = await node.switch.start()
|
||||
traceAsyncErrors node.runDiscoveryLoop()
|
||||
|
||||
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
|
||||
new result
|
||||
result.info = info
|
||||
result.network = network
|
||||
result.connectionState = Connected
|
||||
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
|
||||
newSeq result.protocolStates, allProtocols.len
|
||||
for i in 0 ..< allProtocols.len:
|
||||
let proto = allProtocols[i]
|
||||
if proto.peerStateInitializer != nil:
|
||||
result.protocolStates[i] = proto.peerStateInitializer(result)
|
||||
|
||||
proc registerMsg(protocol: ProtocolInfo,
|
||||
name: string,
|
||||
mounter: MounterProc,
|
||||
libp2pCodecName: string,
|
||||
printer: MessageContentPrinter) =
|
||||
protocol.messages.add MessageInfo(name: name,
|
||||
protocolMounter: mounter,
|
||||
libp2pCodecName: libp2pCodecName,
|
||||
printer: printer)
|
||||
|
||||
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
var
|
||||
Format = ident "SSZ"
|
||||
Responder = bindSym "Responder"
|
||||
P2PStream = bindSym "P2PStream"
|
||||
Peer = bindSym "Peer"
|
||||
Eth2Node = bindSym "Eth2Node"
|
||||
messagePrinter = bindSym "messagePrinter"
|
||||
registerMsg = bindSym "registerMsg"
|
||||
initProtocol = bindSym "initProtocol"
|
||||
msgVar = ident "msg"
|
||||
networkVar = ident "network"
|
||||
callUserHandler = ident "callUserHandler"
|
||||
|
||||
p.useRequestIds = false
|
||||
p.useSingleRecordInlining = true
|
||||
|
||||
new result
|
||||
|
||||
result.PeerType = Peer
|
||||
result.NetworkType = Eth2Node
|
||||
result.registerProtocol = bindSym "registerProtocol"
|
||||
result.setEventHandlers = bindSym "setEventHandlers"
|
||||
result.SerializationFormat = Format
|
||||
result.ResponderType = Responder
|
||||
|
||||
result.afterProtocolInit = proc (p: P2PProtocol) =
|
||||
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)
|
||||
|
||||
result.implementMsg = proc (msg: Message) =
|
||||
let
|
||||
protocol = msg.protocol
|
||||
msgName = $msg.ident
|
||||
msgNameLit = newLit msgName
|
||||
MsgRecName = msg.recName
|
||||
MsgStrongRecName = msg.strongRecName
|
||||
codecNameLit = getRequestProtoName(msg.procDef)
|
||||
|
||||
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
|
||||
# Request procs need an extra param - the stream where the response
|
||||
# should be written:
|
||||
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
|
||||
msg.initResponderCall.add streamVar
|
||||
|
||||
##
|
||||
## Implement the Thunk:
|
||||
##
|
||||
## The protocol handlers in nim-libp2p receive only a `P2PStream`
|
||||
## parameter and there is no way to access the wider context (such
|
||||
## as the current `Switch`). In our handlers, we may need to list all
|
||||
## peers in the current network, so we must keep a reference to the
|
||||
## network object in the closure environment of the installed handlers.
|
||||
##
|
||||
## For this reason, we define a `protocol mounter` proc that will
|
||||
## initialize the network object by creating handlers bound to the
|
||||
## specific network.
|
||||
##
|
||||
let
|
||||
protocolMounterName = ident(msgName & "_mounter")
|
||||
userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar])
|
||||
|
||||
var mounter: NimNode
|
||||
if msg.userHandler != nil:
|
||||
protocol.outRecvProcs.add quote do:
|
||||
template `callUserHandler`(`peerVar`: `Peer`,
|
||||
`streamVar`: `P2PStream`,
|
||||
`msgVar`: `MsgRecName`): untyped =
|
||||
`userHandlerCall`
|
||||
|
||||
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
|
||||
proc thunk(`streamVar`: `P2PStream`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`,
|
||||
`MsgStrongRecName`, `Format`)
|
||||
|
||||
mount `networkVar`.switch,
|
||||
LPProtocol(codec: `codecNameLit`, handler: thunk)
|
||||
|
||||
mounter = protocolMounterName
|
||||
else:
|
||||
mounter = newNilLit()
|
||||
|
||||
##
|
||||
## Implement Senders and Handshake
|
||||
##
|
||||
if msg.kind == msgHandshake:
|
||||
macros.error "Handshake messages are not supported in LibP2P protocols"
|
||||
else:
|
||||
var sendProc = msg.createSendProc()
|
||||
implementSendProcBody sendProc
|
||||
|
||||
protocol.outProcRegistrations.add(
|
||||
newCall(registerMsg,
|
||||
protocol.protocolInfoVar,
|
||||
msgNameLit,
|
||||
mounter,
|
||||
codecNameLit,
|
||||
newTree(nnkBracketExpr, messagePrinter, MsgRecName)))
|
||||
|
||||
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
|
||||
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)
|
||||
|
||||
proc setupNat(conf: BeaconNodeConf): tuple[ip: IpAddress,
|
||||
tcpPort: Port,
|
||||
udpPort: Port] =
|
||||
# defaults
|
||||
result.ip = globalListeningAddr
|
||||
result.tcpPort = Port(conf.tcpPort)
|
||||
result.udpPort = Port(conf.udpPort)
|
||||
result.tcpPort = conf.tcpPort
|
||||
result.udpPort = conf.udpPort
|
||||
|
||||
var nat: NatStrategy
|
||||
case conf.nat.toLowerAscii:
|
||||
|
@ -63,25 +873,6 @@ proc setupNat(conf: BeaconNodeConf): tuple[ip: IpAddress,
|
|||
if extPorts.isSome:
|
||||
(result.tcpPort, result.udpPort) = extPorts.get()
|
||||
|
||||
import
|
||||
os, random,
|
||||
stew/io, eth/async_utils,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
ssz
|
||||
|
||||
export
|
||||
multiaddress
|
||||
|
||||
import
|
||||
libp2p/standard_setup, libp2p_backend, libp2p/peerinfo, peer_pool
|
||||
|
||||
export
|
||||
libp2p_backend, peer_pool, peerinfo
|
||||
|
||||
const
|
||||
netBackendName* = "libp2p"
|
||||
networkKeyFilename = "privkey.protobuf"
|
||||
|
||||
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
|
||||
PublicKey(scheme: Secp256k1, skkey: key)
|
||||
|
||||
|
@ -99,13 +890,6 @@ proc initAddress*(T: type MultiAddress, str: string): T =
|
|||
template tcpEndPoint(address, port): auto =
|
||||
MultiAddress.init(address, Protocol.IPPROTO_TCP, port)
|
||||
|
||||
proc ensureNetworkIdFile(conf: BeaconNodeConf): string =
|
||||
result = conf.dataDir / networkKeyFilename
|
||||
if not fileExists(result):
|
||||
createDir conf.dataDir.string
|
||||
let pk = PrivateKey.random(Secp256k1)
|
||||
writeFile(result, pk.getBytes)
|
||||
|
||||
proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair =
|
||||
let privKeyPath = conf.dataDir / networkKeyFilename
|
||||
var privKey: PrivateKey
|
||||
|
|
|
@ -1,822 +0,0 @@
|
|||
import
|
||||
algorithm, typetraits, net as stdNet,
|
||||
stew/[varints,base58], stew/shims/[macros, tables], chronos, chronicles,
|
||||
stint, faststreams/output_stream, serialization, metrics,
|
||||
json_serialization/std/[net, options],
|
||||
eth/[keys, async_utils], eth/p2p/[enode, p2p_protocol_dsl],
|
||||
eth/p2p/discoveryv5/[enr, node],
|
||||
# TODO: create simpler to use libp2p modules that use re-exports
|
||||
libp2p/[switch, multistream, connection,
|
||||
multiaddress, peerinfo, peer,
|
||||
crypto/crypto, protocols/identify, protocols/protocol],
|
||||
libp2p/muxers/mplex/[mplex, types],
|
||||
libp2p/protocols/secure/[secure, secio],
|
||||
libp2p/protocols/pubsub/[pubsub, floodsub],
|
||||
libp2p/transports/[transport, tcptransport],
|
||||
libp2p_json_serialization, eth2_discovery, conf, ssz,
|
||||
peer_pool
|
||||
|
||||
import
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol
|
||||
|
||||
export
|
||||
p2pProtocol, libp2p_json_serialization, ssz
|
||||
|
||||
type
|
||||
Bytes = seq[byte]
|
||||
P2PStream = Connection
|
||||
|
||||
# TODO Is this really needed?
|
||||
Eth2Node* = ref object of RootObj
|
||||
switch*: Switch
|
||||
discovery*: Eth2DiscoveryProtocol
|
||||
wantedPeers*: int
|
||||
peerPool*: PeerPool[Peer, PeerID]
|
||||
protocolStates*: seq[RootRef]
|
||||
libp2pTransportLoops*: seq[Future[void]]
|
||||
|
||||
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
|
||||
|
||||
Peer* = ref object
|
||||
network*: Eth2Node
|
||||
info*: PeerInfo
|
||||
wasDialed*: bool
|
||||
discoveryId*: Eth2DiscoveryId
|
||||
connectionState*: ConnectionState
|
||||
protocolStates*: seq[RootRef]
|
||||
maxInactivityAllowed*: Duration
|
||||
score*: int
|
||||
|
||||
ConnectionState* = enum
|
||||
None,
|
||||
Connecting,
|
||||
Connected,
|
||||
Disconnecting,
|
||||
Disconnected
|
||||
|
||||
UntypedResponder = object
|
||||
peer*: Peer
|
||||
stream*: P2PStream
|
||||
|
||||
Responder*[MsgType] = distinct UntypedResponder
|
||||
|
||||
MessageInfo* = object
|
||||
name*: string
|
||||
|
||||
# Private fields:
|
||||
libp2pCodecName: string
|
||||
protocolMounter*: MounterProc
|
||||
printer*: MessageContentPrinter
|
||||
nextMsgResolver*: NextMsgResolver
|
||||
|
||||
ProtocolInfoObj* = object
|
||||
name*: string
|
||||
messages*: seq[MessageInfo]
|
||||
index*: int # the position of the protocol in the
|
||||
# ordered list of supported protocols
|
||||
|
||||
# Private fields:
|
||||
peerStateInitializer*: PeerStateInitializer
|
||||
networkStateInitializer*: NetworkStateInitializer
|
||||
handshake*: HandshakeStep
|
||||
disconnectHandler*: DisconnectionHandler
|
||||
|
||||
ProtocolInfo* = ptr ProtocolInfoObj
|
||||
|
||||
ResponseCode* = enum
|
||||
Success
|
||||
InvalidRequest
|
||||
ServerError
|
||||
|
||||
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
|
||||
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
|
||||
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
|
||||
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
|
||||
ThunkProc* = LPProtoHandler
|
||||
MounterProc* = proc(network: Eth2Node) {.gcsafe.}
|
||||
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
|
||||
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
|
||||
|
||||
DisconnectionReason* = enum
|
||||
ClientShutDown
|
||||
IrrelevantNetwork
|
||||
FaultOrError
|
||||
|
||||
PeerDisconnected* = object of CatchableError
|
||||
reason*: DisconnectionReason
|
||||
|
||||
TransmissionError* = object of CatchableError
|
||||
|
||||
const
|
||||
TCP = net.Protocol.IPPROTO_TCP
|
||||
defaultIncomingReqTimeout = 5000
|
||||
HandshakeTimeout = FaultOrError
|
||||
|
||||
# Spec constants
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains
|
||||
REQ_RESP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
|
||||
GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
|
||||
TTFB_TIMEOUT* = 5.seconds
|
||||
RESP_TIMEOUT* = 10.seconds
|
||||
|
||||
readTimeoutErrorMsg = "Exceeded read timeout for a request"
|
||||
|
||||
logScope:
|
||||
topics = "libp2p"
|
||||
|
||||
declarePublicGauge libp2p_successful_dials,
|
||||
"Number of successfully dialed peers"
|
||||
|
||||
declarePublicGauge libp2p_peers,
|
||||
"Number of active libp2p peers"
|
||||
|
||||
template libp2pProtocol*(name: string, version: int) {.pragma.}
|
||||
|
||||
template `$`*(peer: Peer): string = id(peer.info)
|
||||
chronicles.formatIt(Peer): $it
|
||||
|
||||
template remote*(peer: Peer): untyped =
|
||||
peer.info.peerId
|
||||
|
||||
# TODO: This exists only as a compatibility layer between the daemon
|
||||
# APIs and the native LibP2P ones. It won't be necessary once the
|
||||
# daemon is removed.
|
||||
#
|
||||
template writeAllBytes(stream: P2PStream, bytes: seq[byte]): untyped =
|
||||
write(stream, bytes)
|
||||
|
||||
template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped =
|
||||
dial(node.switch, peer.info, protocolId)
|
||||
|
||||
proc peer(stream: P2PStream): PeerID =
|
||||
# TODO: Can this be `nil`?
|
||||
stream.peerInfo.peerId
|
||||
#
|
||||
# End of compatibility layer
|
||||
|
||||
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
|
||||
|
||||
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
|
||||
let peerId = peerInfo.peerId
|
||||
result = node.peerPool.getOrDefault(peerId)
|
||||
if result == nil:
|
||||
result = Peer.init(node, peerInfo)
|
||||
|
||||
proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
|
||||
# TODO: Can this be `nil`?
|
||||
return network.getPeer(stream.peerInfo)
|
||||
|
||||
proc getKey*(peer: Peer): PeerID {.inline.} =
|
||||
result = peer.info.peerId
|
||||
|
||||
proc getFuture*(peer: Peer): Future[void] {.inline.} =
|
||||
result = peer.info.lifeFuture()
|
||||
|
||||
proc `<`*(a, b: Peer): bool =
|
||||
result = `<`(a.score, b.score)
|
||||
|
||||
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||
notifyOtherPeer = false) {.async.} =
|
||||
# TODO: How should we notify the other peer?
|
||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||
peer.connectionState = Disconnecting
|
||||
await peer.network.switch.disconnect(peer.info)
|
||||
peer.connectionState = Disconnected
|
||||
peer.network.peerPool.release(peer)
|
||||
peer.info.close()
|
||||
|
||||
proc safeClose(stream: P2PStream) {.async.} =
|
||||
if not stream.closed:
|
||||
await close(stream)
|
||||
|
||||
proc handleIncomingPeer*(peer: Peer)
|
||||
|
||||
include eth/p2p/p2p_backends_helpers
|
||||
include eth/p2p/p2p_tracing
|
||||
|
||||
proc getRequestProtoName(fn: NimNode): NimNode =
|
||||
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
|
||||
# (TODO: file as an issue)
|
||||
|
||||
let pragmas = fn.pragma
|
||||
if pragmas.kind == nnkPragma and pragmas.len > 0:
|
||||
for pragma in pragmas:
|
||||
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
|
||||
let protoName = $(pragma[1])
|
||||
let protoVer = $(pragma[2].intVal)
|
||||
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/ssz")
|
||||
|
||||
return newLit("")
|
||||
|
||||
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
|
||||
var e = newException(PeerDisconnected, msg)
|
||||
e.reason = r
|
||||
raise e
|
||||
|
||||
proc disconnectAndRaise(peer: Peer,
|
||||
reason: DisconnectionReason,
|
||||
msg: string) {.async.} =
|
||||
let r = reason
|
||||
await peer.disconnect(r)
|
||||
raisePeerDisconnected(msg, r)
|
||||
|
||||
proc readChunk(stream: P2PStream,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
|
||||
|
||||
proc readSizePrefix(stream: P2PStream,
|
||||
deadline: Future[void]): Future[int] {.async.} =
|
||||
trace "about to read msg size prefix"
|
||||
var parser: VarintParser[uint64, ProtoBuf]
|
||||
while true:
|
||||
var nextByte: byte
|
||||
var readNextByte = stream.readExactly(addr nextByte, 1)
|
||||
await readNextByte or deadline
|
||||
if not readNextByte.finished:
|
||||
trace "size prefix byte not received in time"
|
||||
return -1
|
||||
case parser.feedByte(nextByte)
|
||||
of Done:
|
||||
let res = parser.getResult
|
||||
if res > uint64(REQ_RESP_MAX_SIZE):
|
||||
trace "size prefix outside of range", res
|
||||
return -1
|
||||
else:
|
||||
trace "got size prefix", res
|
||||
return int(res)
|
||||
of Overflow:
|
||||
trace "size prefix overflow"
|
||||
return -1
|
||||
of Incomplete:
|
||||
continue
|
||||
|
||||
proc readMsgBytes(stream: P2PStream,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Bytes] {.async.} =
|
||||
trace "about to read message bytes", withResponseCode
|
||||
|
||||
try:
|
||||
if withResponseCode:
|
||||
var responseCode: byte
|
||||
trace "about to read response code"
|
||||
var readResponseCode = stream.readExactly(addr responseCode, 1)
|
||||
await readResponseCode or deadline
|
||||
|
||||
if not readResponseCode.finished:
|
||||
trace "response code not received in time"
|
||||
return
|
||||
|
||||
if responseCode > ResponseCode.high.byte:
|
||||
trace "invalid response code", responseCode
|
||||
return
|
||||
|
||||
logScope: responseCode = ResponseCode(responseCode)
|
||||
trace "got response code"
|
||||
|
||||
case ResponseCode(responseCode)
|
||||
of InvalidRequest, ServerError:
|
||||
let responseErrMsg = await readChunk(stream, string, false, deadline)
|
||||
debug "P2P request resulted in error", responseErrMsg
|
||||
return
|
||||
|
||||
of Success:
|
||||
# The response is OK, the execution continues below
|
||||
discard
|
||||
|
||||
var sizePrefix = await readSizePrefix(stream, deadline)
|
||||
trace "got msg size prefix", sizePrefix
|
||||
|
||||
if sizePrefix == -1:
|
||||
debug "Failed to read an incoming message size prefix", peer = stream.peer
|
||||
return
|
||||
|
||||
if sizePrefix == 0:
|
||||
debug "Received SSZ with zero size", peer = stream.peer
|
||||
return
|
||||
|
||||
trace "about to read msg bytes", len = sizePrefix
|
||||
var msgBytes = newSeq[byte](sizePrefix)
|
||||
var readBody = stream.readExactly(addr msgBytes[0], sizePrefix)
|
||||
await readBody or deadline
|
||||
if not readBody.finished:
|
||||
trace "msg bytes not received in time"
|
||||
return
|
||||
|
||||
trace "got message bytes", len = sizePrefix
|
||||
return msgBytes
|
||||
|
||||
except TransportIncompleteError:
|
||||
return @[]
|
||||
|
||||
proc readChunk(stream: P2PStream,
|
||||
MsgType: type,
|
||||
withResponseCode: bool,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
|
||||
try:
|
||||
if msgBytes.len > 0:
|
||||
return some SSZ.decode(msgBytes, MsgType)
|
||||
except SerializationError as err:
|
||||
debug "Failed to decode a network message",
|
||||
msgBytes, errMsg = err.formatMsg("<msg>")
|
||||
return
|
||||
|
||||
proc readResponse(
|
||||
stream: P2PStream,
|
||||
MsgType: type,
|
||||
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
||||
|
||||
when MsgType is seq:
|
||||
type E = ElemType(MsgType)
|
||||
var results: MsgType
|
||||
while true:
|
||||
let nextRes = await readChunk(stream, E, true, deadline)
|
||||
if nextRes.isNone: break
|
||||
results.add nextRes.get
|
||||
if results.len > 0:
|
||||
return some(results)
|
||||
else:
|
||||
return await readChunk(stream, MsgType, true, deadline)
|
||||
|
||||
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
|
||||
var s = init OutputStream
|
||||
s.append byte(responseCode)
|
||||
s.appendVarint errMsg.len
|
||||
s.appendValue SSZ, errMsg
|
||||
s.getOutput
|
||||
|
||||
proc sendErrorResponse(peer: Peer,
|
||||
stream: P2PStream,
|
||||
err: ref SerializationError,
|
||||
msgName: string,
|
||||
msgBytes: Bytes) {.async.} =
|
||||
debug "Received an invalid request",
|
||||
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
|
||||
|
||||
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
|
||||
await stream.writeAllBytes(responseBytes)
|
||||
await stream.close()
|
||||
|
||||
proc sendErrorResponse(peer: Peer,
|
||||
stream: P2PStream,
|
||||
responseCode: ResponseCode,
|
||||
errMsg: string) {.async.} =
|
||||
debug "Error processing request", peer, responseCode, errMsg
|
||||
|
||||
let responseBytes = encodeErrorMsg(ServerError, errMsg)
|
||||
await stream.writeAllBytes(responseBytes)
|
||||
await stream.close()
|
||||
|
||||
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
|
||||
var deadline = sleepAsync RESP_TIMEOUT
|
||||
var streamFut = peer.network.openStream(peer, protocolId)
|
||||
await streamFut or deadline
|
||||
if not streamFut.finished:
|
||||
# TODO: we are returning here because the deadline passed, but
|
||||
# the stream can still be opened eventually a bit later. Who is
|
||||
# going to close it then?
|
||||
raise newException(TransmissionError, "Failed to open LibP2P stream")
|
||||
|
||||
let stream = streamFut.read
|
||||
defer:
|
||||
await safeClose(stream)
|
||||
|
||||
var s = init OutputStream
|
||||
s.appendVarint requestBytes.len.uint64
|
||||
s.append requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.writeAllBytes(bytes)
|
||||
|
||||
# TODO There is too much duplication in the responder functions, but
|
||||
# I hope to reduce this when I increse the reliance on output streams.
|
||||
proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} =
|
||||
var s = init OutputStream
|
||||
s.append byte(Success)
|
||||
s.appendVarint payload.len.uint64
|
||||
s.append payload
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.writeAllBytes(bytes)
|
||||
|
||||
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
|
||||
var s = init OutputStream
|
||||
s.append byte(Success)
|
||||
s.appendValue SSZ, sizePrefixed(val)
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.writeAllBytes(bytes)
|
||||
|
||||
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
|
||||
var s = init OutputStream
|
||||
for chunk in chunks:
|
||||
s.append byte(Success)
|
||||
s.appendValue SSZ, sizePrefixed(chunk)
|
||||
|
||||
let bytes = s.getOutput
|
||||
await responder.stream.writeAllBytes(bytes)
|
||||
|
||||
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
||||
ResponseMsg: type,
|
||||
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
|
||||
var deadline = sleepAsync timeout
|
||||
|
||||
# Open a new LibP2P stream
|
||||
var streamFut = peer.network.openStream(peer, protocolId)
|
||||
await streamFut or deadline
|
||||
if not streamFut.finished:
|
||||
# TODO: we are returning here because the deadline passed, but
|
||||
# the stream can still be opened eventually a bit later. Who is
|
||||
# going to close it then?
|
||||
return none(ResponseMsg)
|
||||
|
||||
let stream = streamFut.read
|
||||
defer:
|
||||
await safeClose(stream)
|
||||
|
||||
# Send the request
|
||||
var s = init OutputStream
|
||||
s.appendVarint requestBytes.len.uint64
|
||||
s.append requestBytes
|
||||
let bytes = s.getOutput
|
||||
await stream.writeAllBytes(bytes)
|
||||
|
||||
# Read the response
|
||||
return await stream.readResponse(ResponseMsg, deadline)
|
||||
|
||||
proc init*[MsgType](T: type Responder[MsgType],
|
||||
peer: Peer, stream: P2PStream): T =
|
||||
T(UntypedResponder(peer: peer, stream: stream))
|
||||
|
||||
template write*[M](r: var Responder[M], val: auto): auto =
|
||||
mixin send
|
||||
type Msg = M
|
||||
type MsgRec = RecType(Msg)
|
||||
when MsgRec is seq|openarray:
|
||||
type E = ElemType(MsgRec)
|
||||
when val is E:
|
||||
sendResponseChunkObj(UntypedResponder(r), val)
|
||||
elif val is MsgRec:
|
||||
sendResponseChunks(UntypedResponder(r), val)
|
||||
else:
|
||||
{.fatal: "Unepected message type".}
|
||||
else:
|
||||
send(r, val)
|
||||
|
||||
proc performProtocolHandshakes*(peer: Peer) {.async.} =
|
||||
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
|
||||
for protocol in allProtocols:
|
||||
if protocol.handshake != nil:
|
||||
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
|
||||
|
||||
await all(subProtocolsHandshakes)
|
||||
|
||||
template initializeConnection*(peer: Peer): auto =
|
||||
performProtocolHandshakes(peer)
|
||||
|
||||
proc initProtocol(name: string,
|
||||
peerInit: PeerStateInitializer,
|
||||
networkInit: NetworkStateInitializer): ProtocolInfoObj =
|
||||
result.name = name
|
||||
result.messages = @[]
|
||||
result.peerStateInitializer = peerInit
|
||||
result.networkStateInitializer = networkInit
|
||||
|
||||
proc registerProtocol(protocol: ProtocolInfo) =
|
||||
# TODO: This can be done at compile-time in the future
|
||||
let pos = lowerBound(gProtocols, protocol)
|
||||
gProtocols.insert(protocol, pos)
|
||||
for i in 0 ..< gProtocols.len:
|
||||
gProtocols[i].index = i
|
||||
|
||||
proc setEventHandlers(p: ProtocolInfo,
|
||||
handshake: HandshakeStep,
|
||||
disconnectHandler: DisconnectionHandler) =
|
||||
p.handshake = handshake
|
||||
p.disconnectHandler = disconnectHandler
|
||||
|
||||
proc implementSendProcBody(sendProc: SendProc) =
|
||||
let
|
||||
msg = sendProc.msg
|
||||
UntypedResponder = bindSym "UntypedResponder"
|
||||
await = ident "await"
|
||||
|
||||
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
|
||||
if msg.kind != msgResponse:
|
||||
let msgProto = getRequestProtoName(msg.procDef)
|
||||
case msg.kind
|
||||
of msgRequest:
|
||||
let
|
||||
timeout = msg.timeoutParam[0]
|
||||
ResponseRecord = msg.response.recName
|
||||
quote:
|
||||
makeEth2Request(`peer`, `msgProto`, `bytes`,
|
||||
`ResponseRecord`, `timeout`)
|
||||
else:
|
||||
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
|
||||
else:
|
||||
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`)
|
||||
|
||||
sendProc.useStandardBody(nil, nil, sendCallGenerator)
|
||||
|
||||
proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
|
||||
MsgType, Format: distinct type) {.async, gcsafe.} =
|
||||
mixin callUserHandler, RecType
|
||||
const msgName = typetraits.name(MsgType)
|
||||
|
||||
## Uncomment this to enable tracing on all incoming requests
|
||||
## You can include `msgNameLit` in the condition to select
|
||||
## more specific requests:
|
||||
# when chronicles.runtimeFilteringEnabled:
|
||||
# setLogLevel(LogLevel.TRACE)
|
||||
# defer: setLogLevel(LogLevel.DEBUG)
|
||||
# trace "incoming " & `msgNameLit` & " stream"
|
||||
|
||||
let peer = peerFromStream(network, stream)
|
||||
|
||||
handleIncomingPeer(peer)
|
||||
|
||||
defer:
|
||||
await safeClose(stream)
|
||||
|
||||
let
|
||||
deadline = sleepAsync RESP_TIMEOUT
|
||||
msgBytes = await readMsgBytes(stream, false, deadline)
|
||||
|
||||
if msgBytes.len == 0:
|
||||
await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg)
|
||||
return
|
||||
|
||||
type MsgRec = RecType(MsgType)
|
||||
var msg: MsgRec
|
||||
try:
|
||||
msg = decode(Format, msgBytes, MsgRec)
|
||||
except SerializationError as err:
|
||||
await sendErrorResponse(peer, stream, err, msgName, msgBytes)
|
||||
return
|
||||
except Exception as err:
|
||||
# TODO. This is temporary code that should be removed after interop.
|
||||
# It can be enabled only in certain diagnostic builds where it should
|
||||
# re-raise the exception.
|
||||
debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName
|
||||
await sendErrorResponse(peer, stream, ServerError, err.msg)
|
||||
raise err
|
||||
|
||||
try:
|
||||
logReceivedMsg(peer, MsgType(msg))
|
||||
await callUserHandler(peer, stream, msg)
|
||||
except CatchableError as err:
|
||||
await sendErrorResponse(peer, stream, ServerError, err.msg)
|
||||
|
||||
proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
|
||||
let network = peer.network
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
debug "Peer (outgoing) lost", peer = $peer.info
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
let res = await network.peerPool.addOutgoingPeer(peer)
|
||||
if res:
|
||||
debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc handleIncomingPeer*(peer: Peer) =
|
||||
let network = peer.network
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
debug "Peer (incoming) lost", peer = $peer.info
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
let res = network.peerPool.addIncomingPeerNoWait(peer)
|
||||
if res:
|
||||
debug "Peer (incoming) has been added to PeerPool", peer = $peer.info
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc toPeerInfo*(r: enr.TypedRecord): PeerInfo =
|
||||
if r.secp256k1.isSome:
|
||||
var pubKey: keys.PublicKey
|
||||
if recoverPublicKey(r.secp256k1.get, pubKey) != EthKeysStatus.Success:
|
||||
return # TODO
|
||||
|
||||
let peerId = PeerID.init crypto.PublicKey(scheme: Secp256k1, skkey: pubKey)
|
||||
var addresses = newSeq[MultiAddress]()
|
||||
|
||||
if r.ip.isSome and r.tcp.isSome:
|
||||
let ip = IpAddress(family: IpAddressFamily.IPv4,
|
||||
address_v4: r.ip.get)
|
||||
addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get)
|
||||
|
||||
if r.ip6.isSome:
|
||||
let ip = IpAddress(family: IpAddressFamily.IPv6,
|
||||
address_v6: r.ip6.get)
|
||||
if r.tcp6.isSome:
|
||||
addresses.add MultiAddress.init(ip, TCP, Port r.tcp6.get)
|
||||
elif r.tcp.isSome:
|
||||
addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get)
|
||||
else:
|
||||
discard
|
||||
|
||||
if addresses.len > 0:
|
||||
return PeerInfo.init(peerId, addresses)
|
||||
|
||||
proc toPeerInfo(r: Option[enr.TypedRecord]): PeerInfo =
|
||||
if r.isSome:
|
||||
return r.get.toPeerInfo
|
||||
|
||||
proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
|
||||
logScope: peer = $peerInfo
|
||||
|
||||
debug "Connecting to peer"
|
||||
await node.switch.connect(peerInfo)
|
||||
var peer = node.getPeer(peerInfo)
|
||||
peer.wasDialed = true
|
||||
|
||||
debug "Initializing connection"
|
||||
await initializeConnection(peer)
|
||||
|
||||
inc libp2p_successful_dials
|
||||
debug "Network handshakes completed"
|
||||
|
||||
await handleOutgoingPeer(peer)
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
debug "Starting discovery loop"
|
||||
|
||||
while true:
|
||||
let currentPeerCount = node.peerPool.len
|
||||
if currentPeerCount < node.wantedPeers:
|
||||
try:
|
||||
let discoveredPeers =
|
||||
node.discovery.randomNodes(node.wantedPeers - currentPeerCount)
|
||||
debug "Discovered peers", peer = $discoveredPeers
|
||||
for peer in discoveredPeers:
|
||||
try:
|
||||
let peerInfo = peer.record.toTypedRecord.toPeerInfo
|
||||
if peerInfo != nil and peerInfo.id notin node.switch.connections:
|
||||
# TODO do this in parallel
|
||||
await node.dialPeer(peerInfo)
|
||||
except CatchableError as err:
|
||||
debug "Failed to connect to peer", peer = $peer, err = err.msg
|
||||
except CatchableError as err:
|
||||
debug "Failure in discovery", err = err.msg
|
||||
|
||||
await sleepAsync seconds(1)
|
||||
|
||||
proc init*(T: type Eth2Node, conf: BeaconNodeConf,
|
||||
switch: Switch, ip: IpAddress, privKey: keys.PrivateKey): T =
|
||||
new result
|
||||
result.switch = switch
|
||||
result.discovery = Eth2DiscoveryProtocol.new(conf, ip, privKey.data)
|
||||
result.wantedPeers = conf.maxPeers
|
||||
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
|
||||
|
||||
newSeq result.protocolStates, allProtocols.len
|
||||
for proto in allProtocols:
|
||||
if proto.networkStateInitializer != nil:
|
||||
result.protocolStates[proto.index] = proto.networkStateInitializer(result)
|
||||
|
||||
for msg in proto.messages:
|
||||
if msg.protocolMounter != nil:
|
||||
msg.protocolMounter result
|
||||
|
||||
template publicKey*(node: Eth2Node): keys.PublicKey =
|
||||
node.discovery.privKey.getPublicKey
|
||||
|
||||
template addKnownPeer*(node: Eth2Node, peer: ENode|enr.Record) =
|
||||
node.discovery.addNode peer
|
||||
|
||||
proc start*(node: Eth2Node) {.async.} =
|
||||
node.discovery.open()
|
||||
node.libp2pTransportLoops = await node.switch.start()
|
||||
traceAsyncErrors node.runDiscoveryLoop()
|
||||
|
||||
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
|
||||
new result
|
||||
result.info = info
|
||||
result.network = network
|
||||
result.connectionState = Connected
|
||||
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
|
||||
newSeq result.protocolStates, allProtocols.len
|
||||
for i in 0 ..< allProtocols.len:
|
||||
let proto = allProtocols[i]
|
||||
if proto.peerStateInitializer != nil:
|
||||
result.protocolStates[i] = proto.peerStateInitializer(result)
|
||||
|
||||
proc registerMsg(protocol: ProtocolInfo,
|
||||
name: string,
|
||||
mounter: MounterProc,
|
||||
libp2pCodecName: string,
|
||||
printer: MessageContentPrinter) =
|
||||
protocol.messages.add MessageInfo(name: name,
|
||||
protocolMounter: mounter,
|
||||
libp2pCodecName: libp2pCodecName,
|
||||
printer: printer)
|
||||
|
||||
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
||||
var
|
||||
Format = ident "SSZ"
|
||||
Responder = bindSym "Responder"
|
||||
P2PStream = bindSym "P2PStream"
|
||||
OutputStream = bindSym "OutputStream"
|
||||
Peer = bindSym "Peer"
|
||||
Eth2Node = bindSym "Eth2Node"
|
||||
messagePrinter = bindSym "messagePrinter"
|
||||
milliseconds = bindSym "milliseconds"
|
||||
registerMsg = bindSym "registerMsg"
|
||||
initProtocol = bindSym "initProtocol"
|
||||
bindSymOp = bindSym "bindSym"
|
||||
errVar = ident "err"
|
||||
msgVar = ident "msg"
|
||||
msgBytesVar = ident "msgBytes"
|
||||
networkVar = ident "network"
|
||||
await = ident "await"
|
||||
callUserHandler = ident "callUserHandler"
|
||||
|
||||
p.useRequestIds = false
|
||||
p.useSingleRecordInlining = true
|
||||
|
||||
new result
|
||||
|
||||
result.PeerType = Peer
|
||||
result.NetworkType = Eth2Node
|
||||
result.registerProtocol = bindSym "registerProtocol"
|
||||
result.setEventHandlers = bindSym "setEventHandlers"
|
||||
result.SerializationFormat = Format
|
||||
result.ResponderType = Responder
|
||||
|
||||
result.afterProtocolInit = proc (p: P2PProtocol) =
|
||||
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)
|
||||
|
||||
result.implementMsg = proc (msg: Message) =
|
||||
let
|
||||
protocol = msg.protocol
|
||||
msgName = $msg.ident
|
||||
msgNameLit = newLit msgName
|
||||
MsgRecName = msg.recName
|
||||
MsgStrongRecName = msg.strongRecName
|
||||
codecNameLit = getRequestProtoName(msg.procDef)
|
||||
|
||||
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
|
||||
# Request procs need an extra param - the stream where the response
|
||||
# should be written:
|
||||
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
|
||||
msg.initResponderCall.add streamVar
|
||||
|
||||
##
|
||||
## Implement the Thunk:
|
||||
##
|
||||
## The protocol handlers in nim-libp2p receive only a `P2PStream`
|
||||
## parameter and there is no way to access the wider context (such
|
||||
## as the current `Switch`). In our handlers, we may need to list all
|
||||
## peers in the current network, so we must keep a reference to the
|
||||
## network object in the closure environment of the installed handlers.
|
||||
##
|
||||
## For this reason, we define a `protocol mounter` proc that will
|
||||
## initialize the network object by creating handlers bound to the
|
||||
## specific network.
|
||||
##
|
||||
let
|
||||
protocolMounterName = ident(msgName & "_mounter")
|
||||
userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar])
|
||||
|
||||
var mounter: NimNode
|
||||
if msg.userHandler != nil:
|
||||
protocol.outRecvProcs.add quote do:
|
||||
template `callUserHandler`(`peerVar`: `Peer`,
|
||||
`streamVar`: `P2PStream`,
|
||||
`msgVar`: `MsgRecName`): untyped =
|
||||
`userHandlerCall`
|
||||
|
||||
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
|
||||
proc thunk(`streamVar`: `P2PStream`,
|
||||
proto: string): Future[void] {.gcsafe.} =
|
||||
return handleIncomingStream(`networkVar`, `streamVar`,
|
||||
`MsgStrongRecName`, `Format`)
|
||||
|
||||
mount `networkVar`.switch,
|
||||
LPProtocol(codec: `codecNameLit`, handler: thunk)
|
||||
|
||||
mounter = protocolMounterName
|
||||
else:
|
||||
mounter = newNilLit()
|
||||
|
||||
##
|
||||
## Implement Senders and Handshake
|
||||
##
|
||||
if msg.kind == msgHandshake:
|
||||
macros.error "Handshake messages are not supported in LibP2P protocols"
|
||||
else:
|
||||
var sendProc = msg.createSendProc()
|
||||
implementSendProcBody sendProc
|
||||
|
||||
protocol.outProcRegistrations.add(
|
||||
newCall(registerMsg,
|
||||
protocol.protocolInfoVar,
|
||||
msgNameLit,
|
||||
mounter,
|
||||
codecNameLit,
|
||||
newTree(nnkBracketExpr, messagePrinter, MsgRecName)))
|
||||
|
||||
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
|
||||
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)
|
||||
|
Loading…
Reference in New Issue