2019-03-05 22:54:08 +00:00
|
|
|
import
|
2019-06-24 15:13:48 +00:00
|
|
|
macros, algorithm, tables,
|
2019-03-05 22:54:08 +00:00
|
|
|
std_shims/[macros_shim, tables_shims], chronos, chronicles,
|
|
|
|
libp2p/daemon/daemonapi, faststreams/output_stream, serialization,
|
2019-06-24 15:13:48 +00:00
|
|
|
json_serialization/std/options, eth/p2p/p2p_protocol_dsl,
|
|
|
|
libp2p_json_serialization, ssz
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
export
|
2019-06-24 15:13:48 +00:00
|
|
|
daemonapi, p2pProtocol, libp2p_json_serialization
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
type
|
|
|
|
Eth2Node* = ref object of RootObj
|
|
|
|
daemon*: DaemonAPI
|
|
|
|
peers*: Table[PeerID, Peer]
|
|
|
|
protocolStates*: seq[RootRef]
|
|
|
|
|
2019-06-05 02:00:07 +00:00
|
|
|
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
|
|
|
|
|
2019-03-05 22:54:08 +00:00
|
|
|
Peer* = ref object
|
2019-05-22 07:13:15 +00:00
|
|
|
network*: Eth2Node
|
|
|
|
id*: PeerID
|
|
|
|
connectionState*: ConnectionState
|
2019-03-05 22:54:08 +00:00
|
|
|
awaitedMessages: Table[CompressedMsgId, FutureBase]
|
|
|
|
protocolStates*: seq[RootRef]
|
2019-06-17 11:08:05 +00:00
|
|
|
maxInactivityAllowed*: Duration
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-05 02:00:07 +00:00
|
|
|
ConnectionState* = enum
|
|
|
|
None,
|
|
|
|
Connecting,
|
|
|
|
Connected,
|
|
|
|
Disconnecting,
|
|
|
|
Disconnected
|
|
|
|
|
|
|
|
DisconnectionReason* = enum
|
|
|
|
UselessPeer
|
|
|
|
BreachOfProtocol
|
2019-06-10 23:20:18 +00:00
|
|
|
FaultOrError
|
2019-06-05 02:00:07 +00:00
|
|
|
|
|
|
|
UntypedResponder = object
|
|
|
|
peer*: Peer
|
|
|
|
stream*: P2PStream
|
|
|
|
|
|
|
|
Responder*[MsgType] = distinct UntypedResponder
|
|
|
|
|
|
|
|
MessageInfo* = object
|
|
|
|
name*: string
|
|
|
|
|
|
|
|
# Private fields:
|
|
|
|
thunk*: ThunkProc
|
|
|
|
libp2pProtocol: string
|
|
|
|
printer*: MessageContentPrinter
|
|
|
|
nextMsgResolver*: NextMsgResolver
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
ProtocolInfoObj* = object
|
|
|
|
name*: string
|
|
|
|
messages*: seq[MessageInfo]
|
|
|
|
index*: int # the position of the protocol in the
|
|
|
|
# ordered list of supported protocols
|
|
|
|
|
|
|
|
# Private fields:
|
|
|
|
peerStateInitializer*: PeerStateInitializer
|
|
|
|
networkStateInitializer*: NetworkStateInitializer
|
|
|
|
handshake*: HandshakeStep
|
|
|
|
disconnectHandler*: DisconnectionHandler
|
|
|
|
|
|
|
|
ProtocolInfo* = ptr ProtocolInfoObj
|
|
|
|
|
|
|
|
CompressedMsgId = tuple
|
2019-06-05 02:00:07 +00:00
|
|
|
protocolIdx, methodId: int
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
ResponseCode* = enum
|
|
|
|
Success
|
|
|
|
EncodingError
|
|
|
|
InvalidRequest
|
|
|
|
ServerError
|
|
|
|
|
2019-03-05 22:54:08 +00:00
|
|
|
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
|
|
|
|
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
|
2019-06-21 16:32:52 +00:00
|
|
|
HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.}
|
2019-03-05 22:54:08 +00:00
|
|
|
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
|
2019-05-29 08:21:03 +00:00
|
|
|
ThunkProc* = proc(daemon: DaemonAPI, stream: P2PStream): Future[void] {.gcsafe.}
|
2019-03-05 22:54:08 +00:00
|
|
|
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
|
|
|
|
NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.}
|
|
|
|
|
|
|
|
Bytes = seq[byte]
|
|
|
|
|
|
|
|
PeerDisconnected* = object of CatchableError
|
|
|
|
reason*: DisconnectionReason
|
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
TransmissionError* = object of CatchableError
|
|
|
|
|
2019-03-05 22:54:08 +00:00
|
|
|
const
|
|
|
|
defaultIncomingReqTimeout = 5000
|
|
|
|
defaultOutgoingReqTimeout = 10000
|
2019-05-30 18:57:12 +00:00
|
|
|
HandshakeTimeout = BreachOfProtocol
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-05 02:00:07 +00:00
|
|
|
IrrelevantNetwork* = UselessPeer
|
2019-05-22 07:13:15 +00:00
|
|
|
|
2019-06-05 02:00:07 +00:00
|
|
|
include eth/p2p/p2p_backends_helpers
|
|
|
|
include eth/p2p/p2p_tracing
|
2019-06-24 02:34:01 +00:00
|
|
|
include libp2p_backends_common
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-12 12:23:05 +00:00
|
|
|
proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
|
|
|
|
new result
|
|
|
|
result.daemon = daemon
|
|
|
|
result.daemon.userData = result
|
2019-06-21 16:32:52 +00:00
|
|
|
result.peers = initTable[PeerID, Peer]()
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-12 12:23:05 +00:00
|
|
|
newSeq result.protocolStates, allProtocols.len
|
2019-03-05 22:54:08 +00:00
|
|
|
for proto in allProtocols:
|
|
|
|
if proto.networkStateInitializer != nil:
|
2019-06-12 12:23:05 +00:00
|
|
|
result.protocolStates[proto.index] = proto.networkStateInitializer(result)
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
for msg in proto.messages:
|
|
|
|
if msg.libp2pProtocol.len > 0:
|
2019-06-12 12:23:05 +00:00
|
|
|
await daemon.addHandler(@[msg.libp2pProtocol], msg.thunk)
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
proc readMsg(stream: P2PStream,
|
|
|
|
MsgType: type,
|
|
|
|
withResponseCode: bool,
|
|
|
|
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
|
|
|
|
|
|
|
|
proc readMsgBytes(stream: P2PStream,
|
|
|
|
withResponseCode: bool,
|
|
|
|
deadline: Future[void]): Future[Bytes] {.async.} =
|
|
|
|
if withResponseCode:
|
|
|
|
var responseCode: byte
|
|
|
|
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
|
|
|
|
await readResponseCode or deadline
|
|
|
|
if not readResponseCode.finished: return
|
|
|
|
if responseCode > ResponseCode.high.byte: return
|
|
|
|
|
|
|
|
logScope: responseCode = ResponseCode(responseCode)
|
|
|
|
case ResponseCode(responseCode)
|
|
|
|
of InvalidRequest:
|
|
|
|
debug "P2P request was classified as invalid"
|
|
|
|
return
|
|
|
|
of EncodingError, ServerError:
|
|
|
|
let responseErrMsg = await readMsg(stream, string, false, deadline)
|
|
|
|
debug "P2P request resulted in error", responseErrMsg
|
|
|
|
return
|
|
|
|
of Success:
|
|
|
|
# The response is OK, the execution continues below
|
|
|
|
discard
|
|
|
|
|
2019-03-05 22:54:08 +00:00
|
|
|
var sizePrefix: uint32
|
|
|
|
var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix))
|
2019-06-10 23:20:18 +00:00
|
|
|
await readSizePrefix or deadline
|
2019-03-05 22:54:08 +00:00
|
|
|
if not readSizePrefix.finished: return
|
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
if sizePrefix == 0:
|
|
|
|
debug "Received SSZ with zero size", peer = stream.peer
|
|
|
|
return
|
|
|
|
|
2019-03-05 22:54:08 +00:00
|
|
|
var msgBytes = newSeq[byte](sizePrefix.int + sizeof(sizePrefix))
|
|
|
|
copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix))
|
|
|
|
var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)], sizePrefix.int)
|
2019-06-10 23:20:18 +00:00
|
|
|
await readBody or deadline
|
2019-03-05 22:54:08 +00:00
|
|
|
if not readBody.finished: return
|
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
return msgBytes
|
|
|
|
|
|
|
|
proc readMsgBytesOrClose(stream: P2PStream,
|
|
|
|
withResponseCode: bool,
|
|
|
|
deadline: Future[void]): Future[Bytes] {.async.} =
|
|
|
|
result = await stream.readMsgBytes(withResponseCode, deadline)
|
|
|
|
if result.len == 0: await stream.close()
|
|
|
|
|
|
|
|
proc readMsg(stream: P2PStream,
|
|
|
|
MsgType: type,
|
|
|
|
withResponseCode: bool,
|
|
|
|
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
|
|
|
|
var msgBytes = await stream.readMsgBytesOrClose(withResponseCode, deadline)
|
2019-03-05 22:54:08 +00:00
|
|
|
try:
|
2019-06-24 02:34:01 +00:00
|
|
|
if msgBytes.len > 0: return some SSZ.decode(msgBytes, MsgType)
|
|
|
|
except SerializationError as err:
|
|
|
|
debug "Failed to decode a network message",
|
|
|
|
msgBytes, errMsg = err.formatMsg("<msg>")
|
2019-03-05 22:54:08 +00:00
|
|
|
return
|
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
proc sendErrorResponse(peer: Peer,
|
|
|
|
stream: P2PStream,
|
|
|
|
err: ref SerializationError,
|
|
|
|
msgName: string,
|
|
|
|
msgBytes: Bytes) {.async.} =
|
|
|
|
debug "Received an invalid request",
|
|
|
|
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
|
|
|
|
|
|
|
|
var responseCode = byte(EncodingError)
|
|
|
|
discard await stream.transp.write(addr responseCode, 1)
|
|
|
|
await stream.close()
|
|
|
|
|
|
|
|
proc sendErrorResponse(peer: Peer,
|
|
|
|
stream: P2PStream,
|
|
|
|
responseCode: ResponseCode,
|
|
|
|
errMsg: string) {.async.} =
|
|
|
|
debug "Error processing request",
|
|
|
|
peer, responseCode, errMsg
|
|
|
|
|
|
|
|
var outputStream = init OutputStream
|
|
|
|
outputStream.append byte(responseCode)
|
|
|
|
outputStream.appendValue SSZ, errMsg
|
|
|
|
|
|
|
|
discard await stream.transp.write(outputStream.getOutput)
|
|
|
|
await stream.close()
|
|
|
|
|
2019-03-05 22:54:08 +00:00
|
|
|
proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
|
|
|
|
var stream = await peer.network.daemon.openStream(peer.id, @[protocolId])
|
|
|
|
# TODO how does openStream fail? Set a timeout here and handle it
|
|
|
|
let sent = await stream.transp.write(requestBytes)
|
2019-06-24 02:34:01 +00:00
|
|
|
if sent != requestBytes.len:
|
|
|
|
raise newException(TransmissionError, "Failed to deliver all bytes")
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} =
|
|
|
|
let sent = await stream.transp.write(bytes)
|
2019-06-24 02:34:01 +00:00
|
|
|
if sent != bytes.len:
|
|
|
|
raise newException(TransmissionError, "Failed to deliver all bytes")
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
|
|
|
ResponseMsg: type,
|
2019-06-24 02:34:01 +00:00
|
|
|
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
|
2019-06-10 23:20:18 +00:00
|
|
|
var deadline = sleepAsync timeout
|
|
|
|
# Open a new LibP2P stream
|
|
|
|
var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId])
|
|
|
|
await streamFut or deadline
|
|
|
|
if not streamFut.finished:
|
|
|
|
return none(ResponseMsg)
|
|
|
|
|
|
|
|
# Send the request
|
|
|
|
let stream = streamFut.read
|
2019-03-05 22:54:08 +00:00
|
|
|
let sent = await stream.transp.write(requestBytes)
|
2019-06-21 16:32:52 +00:00
|
|
|
if sent != requestBytes.len:
|
2019-06-10 23:20:18 +00:00
|
|
|
await disconnectAndRaise(peer, FaultOrError, "Incomplete send")
|
|
|
|
|
|
|
|
# Read the response
|
2019-06-24 02:34:01 +00:00
|
|
|
return await stream.readMsg(ResponseMsg, true, deadline)
|
|
|
|
|
|
|
|
proc exchangeHandshake(peer: Peer, protocolId: string, requestBytes: Bytes,
|
|
|
|
ResponseMsg: type,
|
|
|
|
timeout: Duration): Future[ResponseMsg] {.gcsafe, async.} =
|
|
|
|
var response = await makeEth2Request(peer, protocolId, requestBytes,
|
|
|
|
ResponseMsg, timeout)
|
|
|
|
if not response.isSome:
|
|
|
|
await peer.disconnectAndRaise(BreachOfProtocol, "Failed to complete a handshake")
|
|
|
|
|
|
|
|
return response.get
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
proc p2pStreamName(MsgType: type): string =
|
|
|
|
mixin msgProtocol, protocolInfo, msgId
|
|
|
|
MsgType.msgProtocol.protocolInfo.messages[MsgType.msgId].libp2pProtocol
|
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
template handshakeImpl(outputStreamVar, handshakeSerializationCall: untyped,
|
|
|
|
lowLevelThunk: untyped,
|
|
|
|
HandshakeType: untyped,
|
2019-05-22 07:13:15 +00:00
|
|
|
# TODO: we cannot use a type parameter above
|
|
|
|
# because of the following Nim issue:
|
|
|
|
#
|
2019-06-24 02:34:01 +00:00
|
|
|
peer: Peer,
|
|
|
|
stream: P2PStream,
|
|
|
|
timeout: Duration): auto =
|
|
|
|
if stream == nil:
|
|
|
|
var outputStreamVar = init OutputStream
|
|
|
|
handshakeSerializationCall
|
|
|
|
exchangeHandshake(peer, p2pStreamName(HandshakeType),
|
|
|
|
getOutput(outputStreamVar), HandshakeType, timeout)
|
|
|
|
else:
|
|
|
|
proc asyncStep: Future[HandshakeType] {.async.} =
|
|
|
|
let deadline = sleepAsync timeout
|
|
|
|
var responseFut = nextMsg(peer, HandshakeType)
|
|
|
|
await lowLevelThunk(peer.network.daemon, stream) or deadline
|
|
|
|
if not responseFut.finished:
|
|
|
|
await disconnectAndRaise(peer, BreachOfProtocol, "Failed to complete a handshake")
|
|
|
|
|
|
|
|
var outputStreamVar = init OutputStream
|
|
|
|
append(outputStreamVar, byte(Success))
|
|
|
|
handshakeSerializationCall
|
|
|
|
await sendBytes(stream, getOutput(outputStreamVar))
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
return responseFut.read
|
|
|
|
|
|
|
|
asyncStep()
|
2019-03-05 22:54:08 +00:00
|
|
|
|
|
|
|
proc resolveNextMsgFutures(peer: Peer, msg: auto) =
|
|
|
|
type MsgType = type(msg)
|
|
|
|
let msgId = getCompressedMsgId(MsgType)
|
|
|
|
let future = peer.awaitedMessages.getOrDefault(msgId)
|
|
|
|
if future != nil:
|
|
|
|
Future[MsgType](future).complete msg
|
|
|
|
|
|
|
|
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
|
|
|
|
new result
|
|
|
|
result.id = id
|
|
|
|
result.network = network
|
|
|
|
result.awaitedMessages = initTable[CompressedMsgId, FutureBase]()
|
|
|
|
result.connectionState = Connected
|
2019-06-21 16:32:52 +00:00
|
|
|
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
|
2019-03-05 22:54:08 +00:00
|
|
|
newSeq result.protocolStates, allProtocols.len
|
|
|
|
for i in 0 ..< allProtocols.len:
|
|
|
|
let proto = allProtocols[i]
|
|
|
|
if proto.peerStateInitializer != nil:
|
|
|
|
result.protocolStates[i] = proto.peerStateInitializer(result)
|
|
|
|
|
|
|
|
proc performProtocolHandshakes*(peer: Peer) {.async.} =
|
|
|
|
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
|
|
|
|
for protocol in allProtocols:
|
|
|
|
if protocol.handshake != nil:
|
|
|
|
subProtocolsHandshakes.add((protocol.handshake)(peer, nil))
|
|
|
|
|
|
|
|
await all(subProtocolsHandshakes)
|
|
|
|
|
2019-06-21 16:32:52 +00:00
|
|
|
template initializeConnection*(peer: Peer): auto =
|
|
|
|
performProtocolHandshakes(peer)
|
|
|
|
|
2019-03-05 22:54:08 +00:00
|
|
|
proc initProtocol(name: string,
|
|
|
|
peerInit: PeerStateInitializer,
|
|
|
|
networkInit: NetworkStateInitializer): ProtocolInfoObj =
|
|
|
|
result.name = name
|
|
|
|
result.messages = @[]
|
|
|
|
result.peerStateInitializer = peerInit
|
|
|
|
result.networkStateInitializer = networkInit
|
|
|
|
|
|
|
|
proc registerMsg(protocol: ProtocolInfo,
|
|
|
|
name: string,
|
2019-05-29 08:21:03 +00:00
|
|
|
thunk: ThunkProc,
|
2019-03-05 22:54:08 +00:00
|
|
|
libp2pProtocol: string,
|
|
|
|
printer: MessageContentPrinter) =
|
|
|
|
protocol.messages.add MessageInfo(name: name,
|
|
|
|
thunk: thunk,
|
|
|
|
libp2pProtocol: libp2pProtocol,
|
|
|
|
printer: printer)
|
|
|
|
|
|
|
|
proc getRequestProtoName(fn: NimNode): NimNode =
|
2019-06-24 02:34:01 +00:00
|
|
|
return newLit("/ETH/BeaconChain/" & $fn.name & "/1/SSZ")
|
2019-06-03 17:07:50 +00:00
|
|
|
|
|
|
|
proc init*[MsgType](T: type Responder[MsgType],
|
|
|
|
peer: Peer, stream: P2PStream): T =
|
|
|
|
T(UntypedResponder(peer: peer, stream: stream))
|
2019-05-22 07:13:15 +00:00
|
|
|
|
2019-06-05 02:00:07 +00:00
|
|
|
proc implementSendProcBody(sendProc: SendProc) =
|
|
|
|
let
|
|
|
|
msg = sendProc.msg
|
|
|
|
UntypedResponder = bindSym "UntypedResponder"
|
2019-06-24 02:34:01 +00:00
|
|
|
await = ident "await"
|
2019-06-05 02:00:07 +00:00
|
|
|
|
|
|
|
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
|
|
|
|
if msg.kind != msgResponse:
|
|
|
|
let msgProto = getRequestProtoName(msg.procDef)
|
|
|
|
case msg.kind
|
|
|
|
of msgRequest:
|
2019-06-24 02:34:01 +00:00
|
|
|
let
|
|
|
|
timeout = msg.timeoutParam[0]
|
|
|
|
ResponseRecord = msg.response.recIdent
|
|
|
|
quote:
|
|
|
|
makeEth2Request(`peer`, `msgProto`, `bytes`,
|
|
|
|
`ResponseRecord`, `timeout`)
|
2019-06-05 02:00:07 +00:00
|
|
|
of msgHandshake:
|
2019-06-24 02:34:01 +00:00
|
|
|
let
|
|
|
|
timeout = msg.timeoutParam[0]
|
|
|
|
HandshakeRecord = msg.recIdent
|
|
|
|
quote:
|
|
|
|
exchangeHandshake(`peer`, `msgProto`, `bytes`,
|
|
|
|
`HandshakeRecord`, `timeout`)
|
2019-06-05 02:00:07 +00:00
|
|
|
else:
|
2019-06-24 02:34:01 +00:00
|
|
|
quote: sendMsg(`peer`, `msgProto`, `bytes`)
|
2019-06-05 02:00:07 +00:00
|
|
|
else:
|
2019-06-24 02:34:01 +00:00
|
|
|
quote: sendBytes(`UntypedResponder`(`peer`).stream, `bytes`)
|
|
|
|
|
|
|
|
proc prependResponseCode(stream: NimNode): NimNode =
|
|
|
|
quote: append(`stream`, byte(Success))
|
|
|
|
|
|
|
|
let preSerializationStep = if msg.kind == msgResponse:
|
|
|
|
prependResponseCode
|
|
|
|
else:
|
|
|
|
nil
|
2019-06-05 02:00:07 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
sendProc.useStandardBody(preSerializationStep, nil, sendCallGenerator)
|
2019-06-05 02:00:07 +00:00
|
|
|
|
2019-05-22 07:13:15 +00:00
|
|
|
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
2019-03-05 22:54:08 +00:00
|
|
|
var
|
2019-05-31 18:36:32 +00:00
|
|
|
Format = ident "SSZ"
|
2019-05-30 18:57:12 +00:00
|
|
|
Responder = bindSym "Responder"
|
2019-03-05 22:54:08 +00:00
|
|
|
DaemonAPI = bindSym "DaemonAPI"
|
|
|
|
P2PStream = ident "P2PStream"
|
2019-06-24 02:34:01 +00:00
|
|
|
OutputStream = bindSym "OutputStream"
|
2019-03-05 22:54:08 +00:00
|
|
|
Peer = bindSym "Peer"
|
2019-05-22 07:13:15 +00:00
|
|
|
Eth2Node = bindSym "Eth2Node"
|
2019-03-05 22:54:08 +00:00
|
|
|
messagePrinter = bindSym "messagePrinter"
|
2019-05-22 07:13:15 +00:00
|
|
|
milliseconds = bindSym "milliseconds"
|
|
|
|
registerMsg = bindSym "registerMsg"
|
|
|
|
initProtocol = bindSym "initProtocol"
|
|
|
|
bindSymOp = bindSym "bindSym"
|
2019-06-24 02:34:01 +00:00
|
|
|
errVar = ident "err"
|
|
|
|
msgVar = ident "msg"
|
|
|
|
msgBytesVar = ident "msgBytes"
|
2019-06-05 02:00:07 +00:00
|
|
|
daemonVar = ident "daemon"
|
2019-05-22 07:13:15 +00:00
|
|
|
await = ident "await"
|
2019-06-03 17:07:50 +00:00
|
|
|
|
|
|
|
p.useRequestIds = false
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-05-22 07:13:15 +00:00
|
|
|
new result
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-05-22 07:13:15 +00:00
|
|
|
result.PeerType = Peer
|
|
|
|
result.NetworkType = Eth2Node
|
|
|
|
result.registerProtocol = bindSym "registerProtocol"
|
|
|
|
result.setEventHandlers = bindSym "setEventHandlers"
|
2019-05-29 08:21:03 +00:00
|
|
|
result.SerializationFormat = Format
|
2019-05-30 18:57:12 +00:00
|
|
|
result.ResponderType = Responder
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-05-22 07:13:15 +00:00
|
|
|
result.afterProtocolInit = proc (p: P2PProtocol) =
|
2019-06-21 16:32:52 +00:00
|
|
|
p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream)
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-05-30 18:57:12 +00:00
|
|
|
result.implementMsg = proc (msg: Message) =
|
2019-03-05 22:54:08 +00:00
|
|
|
let
|
2019-05-30 18:57:12 +00:00
|
|
|
protocol = msg.protocol
|
2019-06-05 02:00:07 +00:00
|
|
|
msgName = $msg.ident
|
2019-06-24 02:34:01 +00:00
|
|
|
msgNameLit = newLit msgName
|
2019-05-22 07:13:15 +00:00
|
|
|
msgRecName = msg.recIdent
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-05 02:00:07 +00:00
|
|
|
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
|
2019-06-03 17:07:50 +00:00
|
|
|
# Request procs need an extra param - the stream where the response
|
|
|
|
# should be written:
|
2019-06-05 02:00:07 +00:00
|
|
|
msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream))
|
2019-06-03 17:07:50 +00:00
|
|
|
msg.initResponderCall.add streamVar
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-21 16:32:52 +00:00
|
|
|
##
|
|
|
|
## Implemenmt Thunk
|
|
|
|
##
|
2019-06-24 02:34:01 +00:00
|
|
|
var thunkName = ident(msgName & "_thunk")
|
|
|
|
let
|
|
|
|
requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout))
|
|
|
|
awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar])
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-03 17:07:50 +00:00
|
|
|
let tracing = when tracingEnabled:
|
2019-06-24 02:34:01 +00:00
|
|
|
quote: logReceivedMsg(`streamVar`.peer, `msgVar`.get)
|
2019-06-03 17:07:50 +00:00
|
|
|
else:
|
|
|
|
newStmtList()
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
msg.defineThunk quote do:
|
|
|
|
proc `thunkName`(`daemonVar`: `DaemonAPI`,
|
|
|
|
`streamVar`: `P2PStream`) {.async, gcsafe.} =
|
|
|
|
let
|
|
|
|
`deadlineVar` = sleepAsync `requestDataTimeout`
|
|
|
|
`msgBytesVar` = `await` readMsgBytes(`streamVar`, false, `deadlineVar`)
|
|
|
|
`peerVar` = peerFromStream(`daemonVar`, `streamVar`)
|
|
|
|
|
|
|
|
if `msgBytesVar`.len == 0:
|
|
|
|
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError,
|
|
|
|
"Exceeded read timeout for a request")
|
|
|
|
return
|
|
|
|
|
|
|
|
var `msgVar`: `msgRecName`
|
|
|
|
try:
|
|
|
|
`msgVar` = decode(`Format`, `msgBytesVar`, `msgRecName`)
|
|
|
|
except SerializationError as `errVar`:
|
|
|
|
`await` sendErrorResponse(`peerVar`, `streamVar`, `errVar`,
|
|
|
|
`msgNameLit`, `msgBytesVar`)
|
|
|
|
return
|
|
|
|
|
|
|
|
try:
|
|
|
|
`tracing`
|
|
|
|
`awaitUserHandler`
|
|
|
|
resolveNextMsgFutures(`peerVar`, `msgVar`)
|
|
|
|
except CatchableError as `errVar`:
|
|
|
|
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
|
|
|
|
|
|
|
|
##
|
|
|
|
## Implement Senders and Handshake
|
|
|
|
##
|
|
|
|
if msg.kind == msgHandshake:
|
|
|
|
# In LibP2P protocols, the handshake thunk is special. Instead of directly
|
|
|
|
# deserializing the incoming message and calling the user-supplied handler,
|
|
|
|
# we execute the `onPeerConnected` handler instead.
|
|
|
|
#
|
|
|
|
# The `onPeerConnected` handler is executed symmetrically for both peers
|
|
|
|
# and it's expected that one of its very first steps would be to send the
|
|
|
|
# handshake and then await the same from the other side. We call this step
|
|
|
|
# "handshakeExchanger".
|
|
|
|
#
|
|
|
|
# For the initiating peer, the handshakeExchanger opens a stream and sends
|
|
|
|
# a regular request through it, but on the receiving side, it just setups
|
|
|
|
# a future and call the lower-level thunk that will complete it.
|
|
|
|
#
|
|
|
|
let
|
|
|
|
handshake = msg.protocol.onPeerConnected
|
|
|
|
lowLevelThunkName = $thunkName
|
|
|
|
|
2019-06-21 16:32:52 +00:00
|
|
|
if handshake.isNil:
|
|
|
|
macros.error "A LibP2P protocol with a handshake must also include an " &
|
|
|
|
"`onPeerConnected` handler.", msg.procDef
|
|
|
|
|
|
|
|
# We must generate a forward declaration for the `onPeerConnected` handler,
|
|
|
|
# so we can call it from the thunk proc:
|
|
|
|
let handshakeProcName = handshake.name
|
|
|
|
msg.protocol.outRecvProcs.add quote do:
|
|
|
|
proc `handshakeProcName`(`peerVar`: `Peer`,
|
|
|
|
`streamVar`: `P2PStream`) {.async, gcsafe.}
|
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
# Here we replace the 'thunkProc' that will be registered as a handler
|
|
|
|
# for incoming messages:
|
|
|
|
thunkName = ident(msgName & "_handleConnection")
|
|
|
|
|
|
|
|
msg.protocol.outRecvProcs.add quote do:
|
2019-06-21 16:32:52 +00:00
|
|
|
proc `thunkName`(`daemonVar`: `DaemonAPI`,
|
|
|
|
`streamVar`: `P2PStream`) {.async, gcsafe.} =
|
2019-06-24 02:34:01 +00:00
|
|
|
let `peerVar` = peerFromStream(`daemonVar`, `streamVar`)
|
|
|
|
try:
|
|
|
|
`await` `handshakeProcName`(`peerVar`, `streamVar`)
|
|
|
|
except SerializationError as err:
|
|
|
|
debug "Failed to decode message",
|
|
|
|
err = err.formatMsg("<msg>"),
|
|
|
|
msg = `msgNameLit`,
|
|
|
|
peer = $(`streamVar`.peer)
|
|
|
|
`await` disconnect(`peerVar`, FaultOrError)
|
|
|
|
except CatchableError as err:
|
|
|
|
debug "Failed to complete handshake", err = err.msg
|
|
|
|
`await` disconnect(`peerVar`, FaultOrError)
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-05-22 07:13:15 +00:00
|
|
|
var
|
2019-06-24 02:34:01 +00:00
|
|
|
handshakeSerializer = msg.createSerializer()
|
|
|
|
handshakeSerializerName = newLit($handshakeSerializer.name)
|
2019-05-22 07:13:15 +00:00
|
|
|
handshakeExchanger = msg.createSendProc(nnkMacroDef)
|
2019-05-29 15:55:25 +00:00
|
|
|
paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger.def)
|
2019-06-24 02:34:01 +00:00
|
|
|
handshakeTypeName = newLit($msg.recIdent)
|
2019-05-22 07:13:15 +00:00
|
|
|
getAst = ident "getAst"
|
2019-06-24 02:34:01 +00:00
|
|
|
res = ident "result"
|
2019-05-22 07:13:15 +00:00
|
|
|
|
2019-06-05 02:00:07 +00:00
|
|
|
handshakeExchanger.setBody quote do:
|
2019-05-22 07:13:15 +00:00
|
|
|
let
|
2019-06-21 16:32:52 +00:00
|
|
|
stream = ident "stream"
|
2019-06-24 02:34:01 +00:00
|
|
|
outputStreamVar = ident "outputStream"
|
|
|
|
lowLevelThunk = ident `lowLevelThunkName`
|
|
|
|
HandshakeType = ident `handshakeTypeName`
|
2019-05-22 07:13:15 +00:00
|
|
|
params = `paramsArray`
|
|
|
|
peer = params[0]
|
|
|
|
timeout = params[^1]
|
2019-06-24 02:34:01 +00:00
|
|
|
handshakeSerializationCall = newCall(`bindSymOp` `handshakeSerializerName`, params)
|
2019-05-22 07:13:15 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
handshakeSerializationCall[1] = outputStreamVar
|
|
|
|
handshakeSerializationCall.del(handshakeSerializationCall.len - 1)
|
2019-05-22 07:13:15 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
`res` = `getAst`(handshakeImpl(outputStreamVar, handshakeSerializationCall,
|
|
|
|
lowLevelThunk, HandshakeType,
|
|
|
|
peer, stream, timeout))
|
2019-05-22 07:13:15 +00:00
|
|
|
|
2019-06-24 02:34:01 +00:00
|
|
|
when defined(debugMacros) or defined(debugHandshake):
|
|
|
|
echo "---- Handshake implementation ----"
|
|
|
|
echo repr(`res`)
|
|
|
|
else:
|
|
|
|
var sendProc = msg.createSendProc()
|
|
|
|
implementSendProcBody sendProc
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-05-30 18:57:12 +00:00
|
|
|
protocol.outProcRegistrations.add(
|
2019-05-22 07:13:15 +00:00
|
|
|
newCall(registerMsg,
|
2019-05-30 18:57:12 +00:00
|
|
|
protocol.protocolInfoVar,
|
2019-06-24 02:34:01 +00:00
|
|
|
msgNameLit,
|
2019-03-05 22:54:08 +00:00
|
|
|
thunkName,
|
2019-06-05 02:00:07 +00:00
|
|
|
getRequestProtoName(msg.procDef),
|
2019-05-22 07:13:15 +00:00
|
|
|
newTree(nnkBracketExpr, messagePrinter, msgRecName)))
|
2019-03-05 22:54:08 +00:00
|
|
|
|
2019-05-22 07:13:15 +00:00
|
|
|
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
|
|
|
|
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)
|
2019-03-05 22:54:08 +00:00
|
|
|
|