nim-eth-p2p/eth_p2p/rlpx.nim

1360 lines
49 KiB
Nim
Raw Normal View History

import
macros, tables, algorithm, deques, hashes, options, typetraits,
chronicles, nimcrypto, asyncdispatch2, rlp, eth_common, eth_keys,
private/types, kademlia, auth, rlpxcrypt, enode
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
import snappy
const
devp2pSnappyVersion* = 5
logScope:
topics = "rlpx"
const
devp2pVersion* = 4
defaultReqTimeout = 10000
maxMsgSize = 1024 * 1024
var
gProtocols: seq[ProtocolInfo]
gDispatchers = initSet[Dispatcher]()
devp2p: ProtocolInfo
# The variables above are immutable RTTI information. We need to tell
# Nim to not consider them GcSafe violations:
template rlpxProtocols*: auto = {.gcsafe.}: gProtocols
template devp2pProtocolInfo: auto = {.gcsafe.}: devp2p
proc newFuture[T](location: var Future[T]) =
location = newFuture[T]()
proc `$`*(p: Peer): string {.inline.} =
$p.remote
proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.}
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
# Dispatcher
#
proc hash(d: Dispatcher): int =
hash(d.protocolOffsets)
proc `==`(lhs, rhs: Dispatcher): bool =
lhs.activeProtocols == rhs.activeProtocols
proc describeProtocols(d: Dispatcher): string =
result = ""
for protocol in d.activeProtocols:
if result.len != 0: result.add(',')
for c in protocol.name: result.add(c)
proc numProtocols(d: Dispatcher): int =
d.activeProtocols.len
proc getDispatcher(node: EthereumNode,
otherPeerCapabilities: openarray[Capability]): Dispatcher =
# TODO: sub-optimal solution until progress is made here:
# https://github.com/nim-lang/Nim/issues/7457
# We should be able to find an existing dispatcher without allocating a new one
new(result)
newSeq(result.protocolOffsets, rlpxProtocols.len)
result.protocolOffsets.fill -1
var nextUserMsgId = 0x10
for localProtocol in node.rlpxProtocols:
let idx = localProtocol.index
block findMatchingProtocol:
for remoteCapability in otherPeerCapabilities:
if localProtocol.name == remoteCapability.name and
localProtocol.version == remoteCapability.version:
result.protocolOffsets[idx] = nextUserMsgId
nextUserMsgId += localProtocol.messages.len
break findMatchingProtocol
if result in gDispatchers:
return gDispatchers[result]
else:
template copyTo(src, dest; index: int) =
for i in 0 ..< src.len:
dest[index + i] = addr src[i]
result.messages = newSeq[ptr MessageInfo](nextUserMsgId)
devp2pProtocolInfo.messages.copyTo(result.messages, 0)
for localProtocol in node.rlpxProtocols:
let idx = localProtocol.index
if result.protocolOffsets[idx] != -1:
result.activeProtocols.add localProtocol
localProtocol.messages.copyTo(result.messages,
result.protocolOffsets[idx])
gDispatchers.incl result
proc getMsgName*(peer: Peer, msgId: int): string =
if not peer.dispatcher.isNil and
msgId < peer.dispatcher.messages.len:
return peer.dispatcher.messages[msgId].name
else:
return case msgId
of 0: "hello"
of 1: "disconnect"
of 2: "ping"
of 3: "pong"
else: $msgId
proc getMsgMetadata*(peer: Peer, msgId: int): (ProtocolInfo, ptr MessageInfo) =
doAssert msgId >= 0
if msgId <= devp2p.messages[^1].id:
return (devp2p, addr devp2p.messages[msgId])
if msgId < peer.dispatcher.messages.len:
for i in 0 ..< rlpxProtocols.len:
let offset = peer.dispatcher.protocolOffsets[i]
if offset != -1 and
offset + rlpxProtocols[i].messages[^1].id >= msgId:
return (rlpxProtocols[i], peer.dispatcher.messages[msgId])
# Protocol info objects
#
proc newProtocol(name: string, version: int,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfo =
new result
result.name[0] = name[0]
result.name[1] = name[1]
result.name[2] = name[2]
result.version = version
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
func asCapability*(p: ProtocolInfo): Capability =
result.name = p.name
result.version = p.version
func nameStr*(p: ProtocolInfo): string =
result = newStringOfCap(3)
for c in p.name: result.add(c)
# XXX: this used to be inline, but inline procs
# cannot be passed to closure params
proc cmp*(lhs, rhs: ProtocolInfo): int =
for i in 0..2:
if lhs.name[i] != rhs.name[i]:
return int16(lhs.name[i]) - int16(rhs.name[i])
return 0
proc messagePrinter[MsgType](msg: pointer): string =
result = ""
# TODO: uncommenting the line below increases the compile-time
# tremendously (for reasons not yet known)
# result = $(cast[ptr MsgType](msg)[])
proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) =
var reader = msgData
Future[MsgType](future).complete reader.readRecordType(MsgType, MsgType.rlpFieldsCount > 1)
proc requestResolver[MsgType](msg: pointer, future: FutureBase) =
var f = Future[Option[MsgType]](future)
if not f.finished:
if msg != nil:
f.complete some(cast[ptr MsgType](msg)[])
else:
f.complete none(MsgType)
else:
# This future was already resolved, but let's do some sanity checks
# here. The only reasonable explanation is that the request should
# have timed out.
if msg != nil:
if f.read.isSome:
doAssert false, "trying to resolve a request twice"
else:
doAssert false, "trying to resolve a timed out request with a value"
else:
if not f.read.isSome:
doAssert false, "a request timed out twice"
proc registerMsg(protocol: var ProtocolInfo,
id: int, name: string,
thunk: MessageHandler,
printer: MessageContentPrinter,
requestResolver: RequestResolver,
nextMsgResolver: NextMsgResolver) =
if protocol.messages.len <= id:
protocol.messages.setLen(id + 1)
protocol.messages[id] = MessageInfo.init(id = id,
name = name,
thunk = thunk,
printer = printer,
requestResolver = requestResolver,
nextMsgResolver = nextMsgResolver)
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
if protocol.version > 0:
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
else:
devp2p = protocol
# Message composition and encryption
#
proc protocolOffset(peer: Peer, Protocol: type): int =
peer.dispatcher.protocolOffsets[Protocol.protocolInfo.index]
proc perPeerMsgId(peer: Peer, proto: type, msgId: int): int {.inline.} =
result = msgId
if not peer.dispatcher.isNil:
result += peer.protocolOffset(proto)
proc perPeerMsgId*(peer: Peer, MsgType: type): int {.inline.} =
peer.perPeerMsgId(MsgType.msgProtocol, MsgType.msgId)
proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
rlpOut: var RlpWriter) =
let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
doAssert baseMsgId != -1
rlpOut.append(baseMsgId + msgId)
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
template invalidIdError: untyped =
raise newException(ValueError,
"RLPx message with an invalid id " & $msgId &
" on a connection supporting " & peer.dispatcher.describeProtocols)
if msgId >= peer.dispatcher.messages.len: invalidIdError()
let thunk = peer.dispatcher.messages[msgId].thunk
if thunk == nil: invalidIdError()
return thunk(peer, msgId, msgData)
proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) =
sendFut.addCallback() do(arg: pointer):
if not sendFut.error.isNil:
resFut.fail(sendFut.error)
proc sendMsg*(peer: Peer, data: Bytes) {.async.} =
trace "sending msg", peer, msg = getMsgName(peer, rlpFromBytes(data).read(int))
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
var cipherText: Bytes
if peer.snappyEnabled:
let compressed = snappy.compress(data)
cipherText = encryptMsg(compressed, peer.secretsState)
else:
cipherText = encryptMsg(data, peer.secretsState)
else:
var cipherText = encryptMsg(data, peer.secretsState)
try:
discard await peer.transport.write(cipherText)
except:
await peer.disconnect(TcpError)
raise
proc send*[Msg](peer: Peer, msg: Msg): Future[void] =
var rlpWriter = initRlpWriter()
rlpWriter.append perPeerMsgId(peer, Msg)
rlpWriter.appendRecordType(msg, Msg.rlpFieldsCount > 1)
peer.sendMsg rlpWriter.finish
proc registerRequest*(peer: Peer,
timeout: int,
responseFuture: FutureBase,
responseMsgId: int): int =
inc peer.lastReqId
result = peer.lastReqId
let timeoutAt = fastEpochTime() + uint64(timeout)
let req = OutstandingRequest.init(id = result,
future = responseFuture,
timeoutAt = timeoutAt)
peer.outstandingRequests[responseMsgId].addLast req
assert(not peer.dispatcher.isNil)
let requestResolver = peer.dispatcher.messages[responseMsgId].requestResolver
proc timeoutExpired(udata: pointer) = requestResolver(nil, responseFuture)
addTimer(timeoutAt, timeoutExpired, nil)
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
logScope:
msg = peer.dispatcher.messages[msgId].name
msgContents = peer.dispatcher.messages[msgId].printer(msg)
receivedReqId = reqId
remotePeer = peer.remote
template resolve(future) =
(peer.dispatcher.messages[msgId].requestResolver)(msg, future)
template outstandingReqs: auto =
peer.outstandingRequests[msgId]
if reqId == -1:
# XXX: This is a response from an ETH-like protocol that doesn't feature
# request IDs. Handling the response is quite tricky here because this may
# be a late response to an already timed out request or a valid response
# from a more recent one.
#
# We can increase the robustness by recording enough features of the
# request so we can recognize the matching response, but this is not very
# easy to do because our peers are allowed to send partial responses.
#
# A more generally robust approach is to maintain a set of the wanted
# data items and then to periodically look for items that have been
# requested long time ago, but are still missing. New requests can be
# issues for such items potentially from another random peer.
var expiredRequests = 0
for req in outstandingReqs:
if not req.future.finished: break
inc expiredRequests
outstandingReqs.shrink(fromFirst = expiredRequests)
if outstandingReqs.len > 0:
let oldestReq = outstandingReqs.popFirst
resolve oldestReq.future
else:
debug "late or duplicate reply for a RLPx request"
else:
# TODO: This is not completely sound because we are still using a global
# `reqId` sequence (the problem is that we might get a response ID that
# matches a request ID for a different type of request). To make the code
# correct, we can use a separate sequence per response type, but we have
# to first verify that the other Ethereum clients are supporting this
# correctly (because then, we'll be reusing the same reqIds for different
# types of requests). Alternatively, we can assign a separate interval in
# the `reqId` space for each type of response.
if reqId > peer.lastReqId:
warn "RLPx response without a matching request"
return
var idx = 0
while idx < outstandingReqs.len:
template req: auto = outstandingReqs()[idx]
if req.future.finished:
assert req.timeoutAt < fastEpochTime()
# Here we'll remove the expired request by swapping
# it with the last one in the deque (if necessary):
if idx != outstandingReqs.len - 1:
req = outstandingReqs.popLast
else:
outstandingReqs.shrink(fromLast = 1)
# This was the last item, so we don't have any
# more work to do:
return
if req.id == reqId:
resolve req.future
# Here we'll remove the found request by swapping
# it with the last one in the deque (if necessary):
if idx != outstandingReqs.len - 1:
req = outstandingReqs.popLast
else:
outstandingReqs.shrink(fromLast = 1)
return
inc idx
debug "late or duplicate reply for a RLPx request"
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
## This procs awaits the next complete RLPx message in the TCP stream
var headerBytes: array[32, byte]
await peer.transport.readExactly(addr headerBytes[0], 32)
var msgSize: int
if decryptHeaderAndGetMsgSize(peer.secretsState,
headerBytes, msgSize) != RlpxStatus.Success:
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot decrypt RLPx frame header")
trace "waiting for message bytes", peer, msgSize
if msgSize > maxMsgSize:
await peer.disconnectAndRaise(BreachOfProtocol,
"RLPx message exceeds maximum size")
let remainingBytes = encryptedLength(msgSize) - 32
# TODO: Migrate this to a thread-local seq
# JACEK:
# or pass it in, allowing the caller to choose - they'll likely be in a
# better position to decide if buffer should be reused or not. this will
# also be useuful for chunked messages where part of the buffer may have
# been processed and needs filling in
var encryptedBytes = newSeq[byte](remainingBytes)
await peer.transport.readExactly(addr encryptedBytes[0], len(encryptedBytes))
let decryptedMaxLength = decryptedLength(msgSize)
var
decryptedBytes = newSeq[byte](decryptedMaxLength)
decryptedBytesCount = 0
if decryptBody(peer.secretsState, encryptedBytes, msgSize,
decryptedBytes, decryptedBytesCount) != RlpxStatus.Success:
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot decrypt RLPx frame body")
decryptedBytes.setLen(decryptedBytesCount)
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
var rlp: Rlp
if peer.network.protocolVersion == devp2pSnappyVersion:
decryptedBytes = snappy.uncompress(decryptedBytes)
if decryptedBytes.len == 0:
await peer.disconnectAndRaise(BreachOfProtocol,
"Snappy uncompress encountered malformed data")
rlp = rlpFromBytes(decryptedBytes.toRange)
else:
var rlp = rlpFromBytes(decryptedBytes.toRange)
try:
let msgid = rlp.read(int)
return (msgId, rlp)
except RlpError:
await peer.disconnectAndRaise(BreachOfProtocol,
"Cannot read RLPx message id")
proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto {.inline.} =
let tmp = r
when defined(release):
return r.read(MsgType)
else:
try:
return r.read(MsgType)
except:
# echo "Failed rlp.read:", tmp.inspect
error "Failed rlp.read",
peer = peer,
msg = MsgType.name,
exception = getCurrentExceptionMsg()
# dataHex = r.rawData.toSeq().toHex()
raise
proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
let wantedId = peer.perPeerMsgId(MsgType)
while true:
var (nextMsgId, nextMsgData) = await peer.recvMsg()
if nextMsgId == wantedId:
try:
return checkedRlpRead(peer, nextMsgData, MsgType)
except RlpError:
await peer.disconnectAndRaise(BreachOfProtocol,
"Invalid RLPx message body")
elif nextMsgId == 1: # p2p.disconnect
raisePeerDisconnected("Unexpected disconnect",
DisconnectionReason nextMsgData.listElem(0).toInt(uint32))
else:
warn "Dropped RLPX message",
msg = peer.dispatcher.messages[nextMsgId].name
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific RLPx message.
## Any messages received while waiting will be dispatched to their
## respective handlers. The designated message handler will also run
## to completion before the future returned by `nextMsg` is resolved.
let wantedId = peer.perPeerMsgId(MsgType)
let f = peer.awaitedMessages[wantedId]
if not f.isNil:
return Future[MsgType](f)
newFuture result
peer.awaitedMessages[wantedId] = result
proc dispatchMessages*(peer: Peer) {.async.} =
while true:
var (msgId, msgData) = await peer.recvMsg()
trace "received msg ", peer, msg = getMsgName(peer, msgId)
# rpl = msgData.inspect
if msgId == 1: # p2p.disconnect
await peer.transport.closeWait()
debug "remote peer disconnected", peer,
reason = msgData.listElem(0).toInt(uint32).DisconnectionReason
break
try:
await peer.invokeThunk(msgId, msgData)
except RlpError:
error "endind dispatchMessages loop", peer, err = getCurrentExceptionMsg()
await peer.disconnect(BreachOfProtocol)
return
if peer.awaitedMessages[msgId] != nil:
let msgInfo = peer.dispatcher.messages[msgId]
(msgInfo.nextMsgResolver)(msgData, peer.awaitedMessages[msgId])
peer.awaitedMessages[msgId] = nil
iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) =
for i in (1 + skip) ..< n.params.len:
let paramNodes = n.params[i]
let paramType = paramNodes[^2]
for j in 0 ..< paramNodes.len - 2:
yield (paramNodes[j], paramType)
proc chooseFieldType(n: NimNode): NimNode =
## Examines the parameter types used in the message signature
## and selects the corresponding field type for use in the
## message object type (i.e. `p2p.hello`).
##
## For now, only openarray types are remapped to sequences.
result = n
if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"):
result = n.copyNimTree
result[0] = ident("seq")
proc getState(peer: Peer, proto: ProtocolInfo): RootRef =
peer.protocolStates[proto.index]
proc supports*(peer: Peer, Protocol: type): bool {.inline.} =
## Checks whether a Peer supports a particular protocol
peer.protocolOffset(Protocol) != -1
template state*(peer: Peer, Protocol: type): untyped =
## Returns the state object of a particular protocol for a
## particular connection.
bind getState
cast[Protocol.State](getState(peer, Protocol.protocolInfo))
proc getNetworkState(node: EthereumNode, proto: ProtocolInfo): RootRef =
node.protocolStates[proto.index]
template protocolState*(node: EthereumNode, Protocol: type): untyped =
bind getNetworkState
cast[Protocol.NetworkState](getNetworkState(node, Protocol.protocolInfo))
template networkState*(connection: Peer, Protocol: type): untyped =
## Returns the network state object of a particular protocol for a
## particular connection.
protocolState(connection.network, Protocol)
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) = discard
proc createPeerState[ProtocolState](peer: Peer): RootRef =
var res = new ProtocolState
mixin initProtocolState
initProtocolState(res, peer)
return cast[RootRef](res)
proc createNetworkState[NetworkState](network: EthereumNode): RootRef =
var res = new NetworkState
mixin initProtocolState
initProtocolState(res, network)
return cast[RootRef](res)
proc popTimeoutParam(n: NimNode): NimNode =
var lastParam = n.params[^1]
if eqIdent(lastParam[0], "timeout"):
if lastParam[2].kind == nnkEmpty:
macros.error "You must specify a default value for the `timeout` parameter", lastParam
result = lastParam
n.params.del(n.params.len - 1)
proc verifyStateType(t: NimNode): NimNode =
result = t[1]
if result.kind == nnkSym and $result == "nil":
return nil
if result.kind != nnkBracketExpr or $result[0] != "ref":
macros.error($result & " must be a ref type")
macro rlpxProtocolImpl(name: static[string],
version: static[uint],
body: untyped,
useRequestIds: static[bool] = true,
timeout: static[int] = defaultReqTimeout,
shortName: static[string] = "",
outgoingRequestDecorator: untyped = nil,
incomingRequestDecorator: untyped = nil,
incomingRequestThunkDecorator: untyped = nil,
incomingResponseDecorator: untyped = nil,
incomingResponseThunkDecorator: untyped = nil,
peerState = type(nil),
networkState = type(nil)): untyped =
## The macro used to defined RLPx sub-protocols. See README.
var
# XXX: deal with a Nim bug causing the macro params to be
# zero when they are captured by a closure:
outgoingRequestDecorator = outgoingRequestDecorator
incomingRequestDecorator = incomingRequestDecorator
incomingRequestThunkDecorator = incomingRequestThunkDecorator
incomingResponseDecorator = incomingResponseDecorator
incomingResponseThunkDecorator = incomingResponseThunkDecorator
useRequestIds = useRequestIds
version = version
defaultTimeout = timeout
nextId = 0
protoName = name
shortName = if shortName.len > 0: shortName else: protoName
outTypes = newNimNode(nnkStmtList)
outSendProcs = newNimNode(nnkStmtList)
outRecvProcs = newNimNode(nnkStmtList)
outProcRegistrations = newNimNode(nnkStmtList)
protoNameIdent = ident(protoName)
resultIdent = ident "result"
perProtocolMsgId = ident"perProtocolMsgId"
protocol = ident(protoName & "Protocol")
isSubprotocol = version > 0'u
peerState = verifyStateType peerState.getType
networkState = verifyStateType networkState.getType
handshake = newNilLit()
disconnectHandler = newNilLit()
Option = bindSym "Option"
# XXX: Binding the int type causes instantiation failure for some reason
# Int = bindSym "int"
Int = ident "int"
Peer = bindSym "Peer"
append = bindSym "append"
createNetworkState = bindSym "createNetworkState"
createPeerState = bindSym "createPeerState"
finish = bindSym "finish"
initRlpWriter = bindSym "initRlpWriter"
enterList = bindSym "enterList"
messagePrinter = bindSym "messagePrinter"
newProtocol = bindSym "newProtocol"
nextMsgResolver = bindSym "nextMsgResolver"
read = bindSym "read"
registerRequest = bindSym "registerRequest"
requestResolver = bindSym "requestResolver"
resolveResponseFuture = bindSym "resolveResponseFuture"
rlpFromBytes = bindSym "rlpFromBytes"
checkedRlpRead = bindSym "checkedRlpRead"
sendMsg = bindSym "sendMsg"
startList = bindSym "startList"
writeMsgId = bindSym "writeMsgId"
getState = bindSym "getState"
getNetworkState = bindSym "getNetworkState"
perPeerMsgId = bindSym "perPeerMsgId"
linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture"
# By convention, all Ethereum protocol names must be abbreviated to 3 letters
assert shortName.len == 3
template applyDecorator(p: NimNode, decorator: NimNode) =
if decorator.kind != nnkNilLit: p.addPragma decorator
proc augmentUserHandler(userHandlerProc: NimNode, msgId = -1, msgKind = rlpxNotification) =
## Turns a regular proc definition into an async proc and adds
## the helpers for accessing the peer and network protocol states.
case msgKind
of rlpxRequest: userHandlerProc.applyDecorator incomingRequestDecorator
of rlpxResponse: userHandlerProc.applyDecorator incomingResponseDecorator
else: discard
userHandlerProc.addPragma ident"async"
# We allow the user handler to use `openarray` params, but we turn
# those into sequences to make the `async` pragma happy.
for i in 1 ..< userHandlerProc.params.len:
var param = userHandlerProc.params[i]
param[^2] = chooseFieldType(param[^2])
var userHandlerDefinitions = newStmtList()
if msgId >= 0:
userHandlerDefinitions.add quote do:
const `perProtocolMsgId` = `msgId`
# Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar)
if peerState != nil:
userHandlerDefinitions.add quote do:
template state(p: `Peer`): `peerState` =
cast[`peerState`](`getState`(p, `protocol`))
if networkState != nil:
userHandlerDefinitions.add quote do:
template networkState(p: `Peer`): `networkState` =
cast[`networkState`](`getNetworkState`(p.network, `protocol`))
userHandlerProc.body.insert 0, userHandlerDefinitions
proc liftEventHandler(doBlock: NimNode, handlerName: string): NimNode =
## Turns a "named" do block to a regular async proc
## (e.g. onPeerConnected do ...)
var fn = newTree(nnkProcDef)
doBlock.copyChildrenTo(fn)
result = genSym(nskProc, protoName & handlerName)
fn.name = result
augmentUserHandler fn
outRecvProcs.add fn
proc addMsgHandler(msgId: int, n: NimNode,
msgKind = rlpxNotification,
responseMsgId = -1,
responseRecord: NimNode = nil): NimNode =
if n[0].kind == nnkPostfix:
macros.error("rlpxProcotol procs are public by default. " &
"Please remove the postfix `*`.", n)
let
msgIdent = n.name
msgName = $n.name
hasReqIds = useRequestIds and msgKind in {rlpxRequest, rlpxResponse}
var
paramCount = 0
userPragmas = n.pragma
# variables used in the sending procs
msgRecipient = ident"msgRecipient"
reqTimeout: NimNode
rlpWriter = ident"writer"
appendParams = newNimNode(nnkStmtList)
reqId = ident"reqId"
perPeerMsgIdVar = ident"perPeerMsgId"
# variables used in the receiving procs
msgSender = ident"msgSender"
receivedRlp = ident"rlp"
receivedMsg = ident"msg"
readParams = newNimNode(nnkStmtList)
readParamsPrelude = newNimNode(nnkStmtList)
callResolvedResponseFuture = newNimNode(nnkStmtList)
# nodes to store the user-supplied message handling proc if present
userHandlerProc: NimNode = nil
userHandlerCall: NimNode = nil
awaitUserHandler = newStmtList()
# a record type associated with the message
msgRecord = newIdentNode(msgName & "Obj")
msgRecordFields = newTree(nnkRecList)
msgRecordBody = newTree(nnkObjectTy,
newEmptyNode(),
newEmptyNode(),
msgRecordFields)
result = msgRecord
if hasReqIds:
# Messages using request Ids
readParamsPrelude.add quote do:
let `reqId` = `read`(`receivedRlp`, int)
case msgKind
of rlpxNotification: discard
of rlpxRequest:
# If the request proc has a default timeout specified, remove it from
# the signature for now so we can generate the `thunk` proc without it.
# The parameter will be added back later only for to the sender proc.
# When the timeout is not specified, we use a default one.
reqTimeout = popTimeoutParam(n)
if reqTimeout == nil:
reqTimeout = newTree(nnkIdentDefs,
ident"timeout",
Int, newLit(defaultTimeout))
let reqToResponseOffset = responseMsgId - msgId
let responseMsgId = quote do: `perPeerMsgIdVar` + `reqToResponseOffset`
# Each request is registered so we can resolve it when the response
# arrives. There are two types of protocols: LES-like protocols use
# explicit `reqId` sent over the wire, while the ETH wire protocol
# assumes there is one outstanding request at a time (if there are
# multiple requests we'll resolve them in FIFO order).
let registerRequestCall = newCall(registerRequest, msgRecipient,
reqTimeout[0],
resultIdent,
responseMsgId)
if hasReqIds:
appendParams.add quote do:
newFuture `resultIdent`
let `reqId` = `registerRequestCall`
`append`(`rlpWriter`, `reqId`)
else:
appendParams.add quote do:
newFuture `resultIdent`
discard `registerRequestCall`
of rlpxResponse:
let reqIdVal = if hasReqIds: `reqId` else: newLit(-1)
callResolvedResponseFuture.add quote do:
`resolveResponseFuture`(`msgSender`,
`perPeerMsgId`(`msgSender`, `msgRecord`),
addr(`receivedMsg`),
`reqIdVal`)
if hasReqIds:
appendParams.add newCall(append, rlpWriter, reqId)
if n.body.kind != nnkEmpty:
# implement the receiving thunk proc that deserialzed the
# message parameters and calls the user proc:
userHandlerProc = n.copyNimTree
userHandlerProc.name = genSym(nskProc, msgName)
augmentUserHandler userHandlerProc, msgId, msgKind
# This is the call to the user supplied handled. Here we add only the
# initial peer param, while the rest of the params will be added later.
userHandlerCall = newCall(userHandlerProc.name, msgSender)
if hasReqIds:
userHandlerProc.params.insert(2, newIdentDefs(reqId, ident"int"))
userHandlerCall.add reqId
# When there is a user handler, it must be awaited in the thunk proc.
# Above, by default `awaitUserHandler` is set to a no-op statement list.
awaitUserHandler = newCall("await", userHandlerCall)
outRecvProcs.add(userHandlerProc)
for param, paramType in n.typedParams(skip = 1):
inc paramCount
# This is a fragment of the sending proc that
# serializes each of the passed parameters:
appendParams.add newCall(append, rlpWriter, param)
# Each message has a corresponding record type.
# Here, we create its fields one by one:
msgRecordFields.add newTree(nnkIdentDefs,
newTree(nnkPostfix, ident("*"), param), # The fields are public
chooseFieldType(paramType), # some types such as openarray
# are automatically remapped
newEmptyNode())
# The received RLP data is deserialized to a local variable of
# the message-specific type. This is done field by field here:
let msgNameLit = newLit(msgName)
readParams.add quote do:
`receivedMsg`.`param` = `checkedRlpRead`(`msgSender`, `receivedRlp`, `paramType`)
# If there is user message handler, we'll place a call to it by
# unpacking the fields of the received message:
if userHandlerCall != nil:
userHandlerCall.add newDotExpr(receivedMsg, param)
if paramCount > 1:
readParamsPrelude.add newCall(enterList, receivedRlp)
let thunkName = ident(msgName & "_thunk")
var thunkProc = quote do:
proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) =
var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecord`
`readParamsPrelude`
`readParams`
`awaitUserHandler`
`callResolvedResponseFuture`
for p in userPragmas: thunkProc.addPragma p
case msgKind
of rlpxRequest: thunkProc.applyDecorator incomingRequestThunkDecorator
of rlpxResponse: thunkProc.applyDecorator incomingResponseThunkDecorator
else: discard
thunkProc.addPragma ident"async"
outRecvProcs.add thunkProc
outTypes.add quote do:
# This is a type featuring a single field for each message param:
type `msgRecord`* = `msgRecordBody`
# Add a helper template for accessing the message type:
# e.g. p2p.hello:
template `msgIdent`*(T: type `protoNameIdent`): type = `msgRecord`
# Add a helper template for obtaining the message Id for
# a particular message type:
template msgId*(T: type `msgRecord`): int = `msgId`
template msgProtocol*(T: type `msgRecord`): type = `protoNameIdent`
var msgSendProc = n
# TODO: check that the first param has the correct type
msgSendProc.params[1][0] = msgRecipient
# Add a timeout parameter for all request procs
case msgKind
of rlpxRequest:
msgSendProc.params.add reqTimeout
of rlpxResponse:
if useRequestIds:
msgSendProc.params.insert 2, newIdentDefs(reqId, ident"int")
else: discard
# We change the return type of the sending proc to a Future.
# If this is a request proc, the future will return the response record.
let rt = if msgKind != rlpxRequest: ident"void"
else: newTree(nnkBracketExpr, Option, responseRecord)
msgSendProc.params[0] = newTree(nnkBracketExpr, ident("Future"), rt)
let msgBytes = ident"msgBytes"
let finalizeRequest = quote do:
let `msgBytes` = `finish`(`rlpWriter`)
var sendCall = newCall(sendMsg, msgRecipient, msgBytes)
let senderEpilogue = if msgKind == rlpxRequest:
# In RLPx requests, the returned future was allocated here and passed
# to `registerRequest`. It's already assigned to the result variable
# of the proc, so we just wait for the sending operation to complete
# and we return in a normal way. (the waiting is done, so we can catch
# any possible errors).
quote: `linkSendFailureToReqFuture`(`sendCall`, `resultIdent`)
else:
# In normal RLPx messages, we are returning the future returned by the
# `sendMsg` call.
quote: return `sendCall`
let `perPeerMsgIdValue` = if isSubprotocol:
newCall(perPeerMsgId, msgRecipient, protoNameIdent, perProtocolMsgId)
else:
perProtocolMsgId
if paramCount > 1:
# In case there are more than 1 parameter,
# the params must be wrapped in a list:
appendParams = newStmtList(
newCall(startList, rlpWriter, newLit(paramCount)),
appendParams)
# Make the send proc public
msgSendProc.name = newTree(nnkPostfix, ident("*"), msgSendProc.name)
# let paramCountNode = newLit(paramCount)
msgSendProc.body = quote do:
var `rlpWriter` = `initRlpWriter`()
let `perProtocolMsgId` = `msgId`
let `perPeerMsgIdVar` = `perPeerMsgIdValue`
`append`(`rlpWriter`, `perPeerMsgIdVar`)
`appendParams`
`finalizeRequest`
`senderEpilogue`
if msgKind == rlpxRequest:
msgSendProc.applyDecorator outgoingRequestDecorator
outSendProcs.add msgSendProc
outProcRegistrations.add(
newCall(bindSym("registerMsg"),
protocol,
newIntLitNode(msgId),
newStrLitNode($n.name),
thunkName,
newTree(nnkBracketExpr, messagePrinter, msgRecord),
newTree(nnkBracketExpr, requestResolver, msgRecord),
newTree(nnkBracketExpr, nextMsgResolver, msgRecord)))
outTypes.add quote do:
# Create a type acting as a pseudo-object representing the protocol
# (e.g. p2p)
type `protoNameIdent`* = object
if peerState != nil:
outTypes.add quote do:
template State*(P: type `protoNameIdent`): type = `peerState`
if networkState != nil:
outTypes.add quote do:
template NetworkState*(P: type `protoNameIdent`): type = `networkState`
for n in body:
case n.kind
of {nnkCall, nnkCommand}:
if eqIdent(n[0], "nextID"):
# By default message IDs are assigned in increasing order
# `nextID` can be used to skip some of the numeric slots
if n.len == 2 and n[1].kind == nnkIntLit:
nextId = n[1].intVal.int
else:
macros.error("nextID expects a single int value", n)
elif eqIdent(n[0], "requestResponse"):
# `requestResponse` can be given a block of 2 or more procs.
# The last one is considered to be a response message, while
# all preceeding ones are requests triggering the response.
# The system makes sure to automatically insert a hidden `reqId`
# parameter used to discriminate the individual messages.
block processReqResp:
if n.len == 2 and n[1].kind == nnkStmtList:
var procs = newSeq[NimNode](0)
for def in n[1]:
if def.kind == nnkProcDef:
procs.add(def)
if procs.len > 1:
let responseMsgId = nextId + procs.len - 1
let responseRecord = addMsgHandler(responseMsgId,
procs[^1],
msgKind = rlpxResponse)
for i in 0 .. procs.len - 2:
discard addMsgHandler(nextId + i, procs[i],
msgKind = rlpxRequest,
responseMsgId = responseMsgId,
responseRecord = responseRecord)
inc nextId, procs.len
# we got all the way to here, so everything is fine.
# break the block so it doesn't reach the error call below
break processReqResp
macros.error("requestResponse expects a block with at least two proc definitions")
elif eqIdent(n[0], "onPeerConnected"):
handshake = liftEventHandler(n[1], "Handshake")
elif eqIdent(n[0], "onPeerDisconnected"):
disconnectHandler = liftEventHandler(n[1], "PeerDisconnect")
else:
macros.error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
of nnkProcDef:
discard addMsgHandler(nextId, n)
inc nextId
of nnkCommentStmt:
discard
else:
macros.error("illegal syntax in a RLPx protocol definition", n)
let peerInit = if peerState == nil: newNilLit()
else: newTree(nnkBracketExpr, createPeerState, peerState)
let netInit = if networkState == nil: newNilLit()
else: newTree(nnkBracketExpr, createNetworkState, networkState)
result = newNimNode(nnkStmtList)
result.add outTypes
result.add quote do:
# One global variable per protocol holds the protocol run-time data
var `protocol` = `newProtocol`(`shortName`, `version`, `peerInit`, `netInit`)
# The protocol run-time data is available as a pseudo-field
# (e.g. `p2p.protocolInfo`)
template protocolInfo*(P: type `protoNameIdent`): ProtocolInfo = `protocol`
result.add outSendProcs, outRecvProcs, outProcRegistrations
result.add quote do:
setEventHandlers(`protocol`, `handshake`, `disconnectHandler`)
result.add newCall(bindSym("registerProtocol"), protocol)
when isMainModule: echo repr(result)
# echo repr(result)
macro rlpxProtocol*(protocolOptions: untyped, body: untyped): untyped =
let protoName = $(protocolOptions[0])
result = protocolOptions
result[0] = bindSym"rlpxProtocolImpl"
result.add(newTree(nnkExprEqExpr,
ident("name"),
newLit(protoName)))
result.add(newTree(nnkExprEqExpr,
ident("body"),
body))
rlpxProtocol p2p(version = 0):
proc hello(peer: Peer,
version: uint,
clientId: string,
capabilities: seq[Capability],
listenPort: uint,
nodeId: array[RawPublicKeySize, byte])
proc sendDisconnectMsg(peer: Peer, reason: DisconnectionReason)
proc ping(peer: Peer) =
discard peer.pong()
proc pong(peer: Peer) =
discard
proc removePeer(network: EthereumNode, peer: Peer) =
if network.peerPool != nil:
network.peerPool.connectedNodes.del(peer.remote)
for observer in network.peerPool.observers.values:
if not observer.onPeerDisconnected.isNil:
observer.onPeerDisconnected(peer)
proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[void] =
var futures = newSeqOfCap[Future[void]](rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
if protocol.disconnectHandler != nil:
futures.add((protocol.disconnectHandler)(peer, reason))
return all(futures)
proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.} =
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
try:
# TODO: investigate the failure here
if not peer.transport.closed and false:
await peer.sendDisconnectMsg(reason)
finally:
if not peer.dispatcher.isNil:
await callDisconnectHandlers(peer, reason)
peer.connectionState = Disconnected
removePeer(peer.network, peer)
proc validatePubKeyInHello(msg: p2p.hello, pubKey: PublicKey): bool =
var pk: PublicKey
recoverPublicKey(msg.nodeId, pk) == EthKeysStatus.Success and pk == pubKey
proc performSubProtocolHandshakes(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer))
await all(subProtocolsHandshakes)
peer.connectionState = Connected
proc checkUselessPeer(peer: Peer) {.inline.} =
if peer.dispatcher.numProtocols == 0:
# XXX: Send disconnect + UselessPeer
raise newException(UselessPeerError, "Useless peer")
proc initPeerState*(peer: Peer, capabilities: openarray[Capability]) =
peer.dispatcher = getDispatcher(peer.network, capabilities)
checkUselessPeer(peer)
# The dispatcher has determined our message ID sequence.
# For each message ID, we allocate a potential slot for
# tracking responses to requests.
# (yes, some of the slots won't be used).
peer.outstandingRequests.newSeq(peer.dispatcher.messages.len)
for d in mitems(peer.outstandingRequests):
d = initDeque[OutstandingRequest]()
# Similarly, we need a bit of book-keeping data to keep track
# of the potentially concurrent calls to `nextMsg`.
peer.awaitedMessages.newSeq(peer.dispatcher.messages.len)
peer.lastReqId = 0
# Initialize all the active protocol states
newSeq(peer.protocolStates, rlpxProtocols.len)
for protocol in peer.dispatcher.activeProtocols:
let peerStateInit = protocol.peerStateInitializer
if peerStateInit != nil:
peer.protocolStates[protocol.index] = peerStateInit(peer)
proc postHelloSteps(peer: Peer, h: p2p.hello): Future[void] =
initPeerState(peer, h.capabilities)
var messageProcessingLoop = peer.dispatchMessages()
messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} =
if messageProcessingLoop.failed:
asyncCheck peer.disconnect(ClientQuitting)
return performSubProtocolHandshakes(peer)
template `^`(arr): auto =
# passes a stack array with a matching `arrLen`
# variable as an open array
arr.toOpenArray(0, `arr Len` - 1)
proc check(status: AuthStatus) =
if status != AuthStatus.Success:
raise newException(Exception, "Error: " & $status)
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte],
p: Peer) =
var secrets: ConnectionSecret
check hs.getSecrets(authMsg, ackMsg, secrets)
initSecretState(secrets, p.secretsState)
burnMem(secrets)
proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
new result
result.network = node
result.remote = remote
let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort)
var ok = false
try:
result.transport = await connect(ta)
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
var handshake = newHandshake({Initiator, EIP8}, int(node.protocolVersion))
else:
var handshake = newHandshake({Initiator, EIP8})
handshake.host = node.keys
var authMsg: array[AuthMessageMaxEIP8, byte]
var authMsgLen = 0
check authMessage(handshake, remote.node.pubkey, authMsg, authMsgLen)
var res = result.transport.write(addr authMsg[0], authMsgLen)
let initialSize = handshake.expectedLength
var ackMsg = newSeqOfCap[byte](1024)
ackMsg.setLen(initialSize)
await result.transport.readExactly(addr ackMsg[0], len(ackMsg))
var ret = handshake.decodeAckMessage(ackMsg)
if ret == AuthStatus.IncompleteError:
ackMsg.setLen(handshake.expectedLength)
await result.transport.readExactly(addr ackMsg[initialSize],
len(ackMsg) - initialSize)
ret = handshake.decodeAckMessage(ackMsg)
check ret
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
result.snappyEnabled = handshake.version.int >= devp2pSnappyVersion
initSecretState(handshake, ^authMsg, ackMsg, result)
# if handshake.remoteHPubkey != remote.node.pubKey:
# raise newException(Exception, "Remote pubkey is wrong")
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
asyncCheck result.hello(handshake.version,
node.clientId,
node.rlpxCapabilities,
uint(node.address.tcpPort),
node.keys.pubkey.getRaw())
2018-11-02 20:45:57 +07:00
else:
asyncCheck result.hello(devp2pVersion,
node.clientId,
node.rlpxCapabilities,
uint(node.address.tcpPort),
node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
await postHelloSteps(result, response)
ok = true
except PeerDisconnected as e:
if e.reason != TooManyPeers:
debug "Unexpected disconnect during rlpxConnect", reason = e.reason
except TransportIncompleteError:
debug "Connection dropped in rlpxConnect", remote
except UselessPeerError:
debug "Useless peer ", peer = remote
except RlpTypeMismatch:
# Some peers report capabilities with names longer than 3 chars. We ignore
# those for now. Maybe we should allow this though.
debug "Rlp error in rlpxConnect"
except:
info "Exception in rlpxConnect", remote,
exc = getCurrentException().name,
err = getCurrentExceptionMsg()
if not ok:
if not isNil(result.transport):
result.transport.close()
result = nil
proc rlpxAccept*(node: EthereumNode,
transport: StreamTransport): Future[Peer] {.async.} =
new result
result.transport = transport
result.network = node
var handshake = newHandshake({Responder})
handshake.host = node.keys
try:
let initialSize = handshake.expectedLength
var authMsg = newSeqOfCap[byte](1024)
2018-11-02 20:45:57 +07:00
authMsg.setLen(initialSize)
await transport.readExactly(addr authMsg[0], len(authMsg))
var ret = handshake.decodeAuthMessage(authMsg)
if ret == AuthStatus.IncompleteError: # Eip8 auth message is likely
authMsg.setLen(handshake.expectedLength)
await transport.readExactly(addr authMsg[initialSize],
len(authMsg) - initialSize)
ret = handshake.decodeAuthMessage(authMsg)
check ret
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
result.snappyEnabled = handshake.version >= devp2pSnappyVersion.uint
handshake.version = uint8(node.protocolVersion)
var ackMsg: array[AckMessageMaxEIP8, byte]
var ackMsgLen: int
check handshake.ackMessage(ackMsg, ackMsgLen)
var res = transport.write(addr ackMsg[0], ackMsgLen)
initSecretState(handshake, authMsg, ^ackMsg, result)
let listenPort = transport.localAddress().port
2018-11-02 20:45:57 +07:00
when defined(useSnappy):
let peerProtocolVersion = if result.snappyEnabled: devp2pSnappyVersion.uint
else: devp2pVersion.uint
await result.hello(peerProtocolVersion, node.clientId,
node.rlpxCapabilities, listenPort.uint,
node.keys.pubkey.getRaw())
else:
await result.hello(devp2pVersion, node.clientId,
node.rlpxCapabilities, listenPort.uint,
node.keys.pubkey.getRaw())
var response = await result.waitSingleMsg(p2p.hello)
if not validatePubKeyInHello(response, handshake.remoteHPubkey):
warn "A Remote nodeId is not its public key" # XXX: Do we care?
let remote = transport.remoteAddress()
let address = Address(ip: remote.address, tcpPort: remote.port,
udpPort: remote.port)
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
await postHelloSteps(result, response)
except:
error "Exception in rlpxAccept",
err = getCurrentExceptionMsg(),
stackTrace = getCurrentException().getStackTrace()
transport.close()
result = nil
when isMainModule:
when false:
# The assignments below can be used to investigate if the RLPx procs
# are considered GcSafe. The short answer is that they aren't, because
# they dispatch into user code that might use the GC.
type
GcSafeDispatchMsg = proc (peer: Peer, msgId: int, msgData: var Rlp)
GcSafeRecvMsg = proc (peer: Peer):
Future[tuple[msgId: int, msgData: Rlp]] {.gcsafe.}
GcSafeAccept = proc (transport: StreamTransport, myKeys: KeyPair):
Future[Peer] {.gcsafe.}
var
dispatchMsgPtr = invokeThunk
recvMsgPtr: GcSafeRecvMsg = recvMsg
acceptPtr: GcSafeAccept = rlpxAccept