WIP refactor the rlpxProtocol macro

* Make all handlers async

* Added a new `requestResponse` construct that allows you to wait
  for the results of a particular request

* Introduced an optional `NetworkState` type for the protocol

* The semantics of `nextMsg` has changed. See the notes in the README
This commit is contained in:
Zahary Karadjov 2018-07-06 04:19:08 +03:00
parent 66ff278f33
commit fac8bbd917
6 changed files with 387 additions and 209 deletions

View File

@ -1,6 +1,3 @@
p2p_discobus is licensed under the Apache License version 2
Copyright (c) 2018 Status Research & Development GmbH
-----------------------------------------------------
Apache License Apache License
Version 2.0, January 2004 Version 2.0, January 2004

View File

@ -1,7 +1,3 @@
p2p_discobus is licensed under the MIT License
Copyright (c) 2018 Status Research & Development GmbH
-----------------------------------------------------
The MIT License (MIT) The MIT License (MIT)
Copyright (c) 2018 Status Research & Development GmbH Copyright (c) 2018 Status Research & Development GmbH

View File

@ -117,9 +117,11 @@ There are few things to note in the above example:
matching the parameter names of the message. If the messages has `openarray` matching the parameter names of the message. If the messages has `openarray`
params, these will be remapped to `seq` types. params, these will be remapped to `seq` types.
By default, `nextMsg` will still automatically dispatch all messages different The future returned by `nextMsg` will be resolved only after the handler of the
from the awaited one, but you can prevent this behavior by specifying the extra designated message has been fully executed (so you can count on any side effects
flag `discardOthers = true`. produced by the handler to have taken place). If there are multiple outstanding
calls to `nextMsg`, they will complete together. Any other messages received in
the meantime will still be dispatched to their respective handlers.
### Checking the other peer's supported sub-protocols ### Checking the other peer's supported sub-protocols

View File

@ -20,16 +20,22 @@ type
Disconnecting, Disconnecting,
Disconnected Disconnected
Network* = ref object
id: int
protocolStates: seq[RootRef]
Peer* = ref object Peer* = ref object
transp: StreamTransport transp: StreamTransport
dispatcher: Dispatcher dispatcher: Dispatcher
networkId: int networkId: int
nextRequestId: int
network: Network
secretsState: SecretState secretsState: SecretState
connectionState: ConnectionState connectionState: ConnectionState
protocolStates: seq[RootRef] protocolStates: seq[RootRef]
remote*: Node remote*: Node
MessageHandler* = proc(x: Peer, data: var Rlp) MessageHandler* = proc(x: Peer, data: Rlp): Future[void]
MessageInfo* = object MessageInfo* = object
id*: int id*: int
@ -65,6 +71,11 @@ type
protocolOffsets: seq[int] protocolOffsets: seq[int]
thunks: seq[MessageHandler] thunks: seq[MessageHandler]
RlpxMessageKind = enum
rlpxNotification,
rlpxRequest,
rlpxResponse
UnsupportedProtocol* = object of Exception UnsupportedProtocol* = object of Exception
# This is raised when you attempt to send a message from a particular # This is raised when you attempt to send a message from a particular
# protocol to a peer that doesn't support the protocol. # protocol to a peer that doesn't support the protocol.
@ -74,7 +85,6 @@ type
const const
baseProtocolVersion = 4 baseProtocolVersion = 4
# TODO: Usage of this variables causes GCSAFE problems.
var var
gProtocols: seq[ProtocolInfo] gProtocols: seq[ProtocolInfo]
gCapabilities: seq[Capability] gCapabilities: seq[Capability]
@ -198,13 +208,10 @@ proc read*(rlp: var Rlp, T: typedesc[KeccakHash]): T =
proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer,
rlpOut: var RlpWriter) = rlpOut: var RlpWriter) =
let baseMsgId = peer.dispatcher.protocolOffsets[p.index] let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
if baseMsgId == -1: doAssert baseMsgId != -1
raise newException(UnsupportedProtocol,
p.nameStr & " is not supported by peer " &
$peer.remote.id)
rlpOut.append(baseMsgId + msgId) rlpOut.append(baseMsgId + msgId)
proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) = proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
template invalidIdError: untyped = template invalidIdError: untyped =
raise newException(ValueError, raise newException(ValueError,
"RLPx message with an invalid id " & $msgId & "RLPx message with an invalid id " & $msgId &
@ -214,14 +221,17 @@ proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
let thunk = peer.dispatcher.thunks[msgId] let thunk = peer.dispatcher.thunks[msgId]
if thunk == nil: invalidIdError() if thunk == nil: invalidIdError()
thunk(peer, msgData) return thunk(peer, msgData)
proc send(p: Peer, data: BytesRange) {.async.} = proc sendMsg(p: Peer, data: BytesRange): Future[int] =
# var rlp = rlpFromBytes(data) # var rlp = rlpFromBytes(data)
# echo "sending: ", rlp.read(int) # echo "sending: ", rlp.read(int)
# echo "payload: ", rlp.inspect # echo "payload: ", rlp.inspect
var cipherText = encryptMsg(data, p.secretsState) var cipherText = encryptMsg(data, p.secretsState)
var res = await p.transp.write(cipherText) return p.transp.write(cipherText)
proc sendRequest(p: Peer, data: BytesRange, ResponseType: type): Future[ResponseType] =
discard
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
## This procs awaits the next complete RLPx message in the TCP stream ## This procs awaits the next complete RLPx message in the TCP stream
@ -235,7 +245,12 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
return (-1, zeroBytesRlp) return (-1, zeroBytesRlp)
let remainingBytes = encryptedLength(msgSize) - 32 let remainingBytes = encryptedLength(msgSize) - 32
# XXX: Migrate this to a thread-local seq # TODO: Migrate this to a thread-local seq
# JACEK:
# or pass it in, allowing the caller to choose - they'll likely be in a
# better position to decide if buffer should be reused or not. this will
# also be useuful for chunked messages where part of the buffer may have
# been processed and needs filling in
var encryptedBytes = newSeq[byte](remainingBytes) var encryptedBytes = newSeq[byte](remainingBytes)
await peer.transp.readExactly(addr encryptedBytes[0], len(encryptedBytes)) await peer.transp.readExactly(addr encryptedBytes[0], len(encryptedBytes))
@ -253,24 +268,34 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
let msgId = rlp.read(int) let msgId = rlp.read(int)
return (msgId, rlp) return (msgId, rlp)
proc nextMsg*(peer: Peer, MsgType: typedesc, proc waitSingleMsg(peer: Peer, MsgType: typedesc): Future[MsgType] {.async.} =
discardOthers = false): Future[MsgType] {.async.} = const wantedId = MsgType.msgId
while true:
var (nextMsgId, nextMsgData) = await peer.recvMsg()
if nextMsgId == wantedId:
return nextMsgData.read(MsgType)
proc nextMsg*(peer: Peer, MsgType: typedesc): Future[MsgType] {.async.} =
## This procs awaits a specific RLPx message. ## This procs awaits a specific RLPx message.
## By default, other messages will be automatically dispatched ## Any messages received while waiting will be dispatched to their
## to their responsive handlers unless `discardOthers` is set to ## respective handlers. The designated message handler will also run
## true. This may be useful when the protocol requires a very ## to completion before the future returned by `nextMsg` is resolved.
## specific response to a given request. Use with caution.
const wantedId = MsgType.msgId const wantedId = MsgType.msgId
while true: while true:
var (nextMsgId, nextMsgData) = await peer.recvMsg() var (nextMsgId, nextMsgData) = await peer.recvMsg()
# echo "got msg(", nextMsgId, "): ", nextMsgData.inspect # echo "got msg(", nextMsgId, "): ", nextMsgData.inspect
if nextMsgData.listLen != 0:
nextMsgData = nextMsgData.listElem(0)
await peer.dispatchMsg(nextMsgId, nextMsgData)
if nextMsgId == wantedId: if nextMsgId == wantedId:
return nextMsgData.read(MsgType) return nextMsgData.read(MsgType)
elif not discardOthers:
if nextMsgData.listLen != 0: proc registerRequest(peer: Peer, responseFuture: FutureBase): uint =
nextMsgData = nextMsgData.listElem(0) discard
peer.dispatchMsg(nextMsgId, nextMsgData)
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqID: uint) =
discard
iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) = iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) =
for i in (1 + skip) ..< n.params.len: for i in (1 + skip) ..< n.params.len:
@ -299,6 +324,14 @@ template state*(connection: Peer, Protocol: typedesc): untyped =
## particular connection. ## particular connection.
cast[ref Protocol.State](connection.getState(Protocol.protocolInfo)) cast[ref Protocol.State](connection.getState(Protocol.protocolInfo))
proc getNetworkState(peer: Peer, proto: ProtocolInfo): RootRef =
peer.network.protocolStates[proto.index]
template networkState*(connection: Peer, Protocol: typedesc): untyped =
## Returns the network state object of a particular protocol for a
## particular connection.
cast[ref Protocol.NetworkState](connection.getNetworkState(Protocol.protocolInfo))
macro rlpxProtocol*(protoIdentifier: untyped, macro rlpxProtocol*(protoIdentifier: untyped,
version: static[int], version: static[int],
body: untyped): untyped = body: untyped): untyped =
@ -306,25 +339,218 @@ macro rlpxProtocol*(protoIdentifier: untyped,
var var
protoName = $protoIdentifier protoName = $protoIdentifier
protoNameIdent = newIdentNode(protoName) protoNameIdent = newIdentNode(protoName)
nextId = BiggestInt 0 resultIdent = newIdentNode "result"
protocol = genSym(nskVar, protoName & "Proto") protocol = genSym(nskVar, protoName & "Proto")
newProtocol = bindSym "newProtocol" newProtocol = bindSym "newProtocol"
rlpFromBytes = bindSym "rlpFromBytes" rlpFromBytes = bindSym "rlpFromBytes"
read = bindSym "read" read = bindSym "read"
initRlpWriter = bindSym "initRlpWriter" initRlpWriter = bindSym "initRlpWriter"
startList = bindSym "startList"
finish = bindSym "finish" finish = bindSym "finish"
append = bindSym "append" append = bindSym "append"
send = bindSym "send" sendMsg = bindSym "sendMsg"
sendRequest = bindSym "sendRequest"
Peer = bindSym "Peer" Peer = bindSym "Peer"
writeMsgId = bindSym "writeMsgId" writeMsgId = bindSym "writeMsgId"
resolveResponseFuture = bindSym "resolveResponseFuture"
registerRequest = bindSym "registerRequest"
isSubprotocol = version > 0 isSubprotocol = version > 0
stateType: NimNode = nil
msgThunksAndRegistrations = newNimNode(nnkStmtList) msgThunksAndRegistrations = newNimNode(nnkStmtList)
nextId = 0
finalOutput = newNimNode(nnkStmtList)
stateType: NimNode = nil
networkStateType: NimNode = nil
useRequestIds = true
# By convention, all Ethereum protocol names must be abbreviated to 3 letters # By convention, all Ethereum protocol names must be abbreviated to 3 letters
assert protoName.len == 3 assert protoName.len == 3
result = newNimNode(nnkStmtList) proc addMsgHandler(msgId: int, n: NimNode,
msgKind = rlpxNotification,
responseMsgId = -1,
responseRecord: NimNode = nil): NimNode =
let
msgIdent = n.name
msgName = $n.name
var
paramCount = 0
# variables used in the sending procs
msgRecipient = genSym(nskParam, "msgRecipient")
rlpWriter = genSym(nskVar, "writer")
appendParams = newNimNode(nnkStmtList)
# variables used in the receiving procs
msgSender = genSym(nskParam, "msgSender")
receivedRlp = genSym(nskVar, "rlp")
receivedMsg = genSym(nskVar, "msg")
readParams = newNimNode(nnkStmtList)
callResolvedResponseFuture = newNimNode(nnkStmtList)
# nodes to store the user-supplied message handling proc if present
userHandlerProc: NimNode = nil
userHandlerCall: NimNode = nil
awaitUserHandler = newStmtList()
# a record type associated with the message
msgRecord = genSym(nskType, msgName & "Obj")
msgRecordFields = newTree(nnkRecList)
msgRecordBody = newTree(nnkObjectTy,
newEmptyNode(),
newEmptyNode(),
msgRecordFields)
result = msgRecord
case msgKind
of rlpxNotification: discard
of rlpxRequest:
# Each request is registered so we can resolve it when the response
# arrives. There are two types of protocols: LES-like protocols use
# explicit `reqID` sent over the wire, while the ETH wire protocol
# assumes there is one outstanding request at a time (if there are
# multiple requests we'll resolve them in FIFO order).
if useRequestIds:
inc paramCount
appendParams.add quote do:
`append`(`rlpWriter`, `registerRequest`(`msgRecipient`,
`resultIdent`,
`responseMsgId`))
else:
appendParams.add quote do:
discard `registerRequest`(`msgRecipient`,
`resultIdent`,
`responseMsgId`)
of rlpxResponse:
if useRequestIds:
var reqId = genSym(nskLet, "reqId")
# Messages using request Ids
readParams.add quote do:
let `reqId` = `read`(`receivedRlp`, uint)
callResolvedResponseFuture.add quote do:
`resolveResponseFuture`(`msgSender`, `msgId`, addr(`receivedMsg`), `reqId`)
else:
callResolvedResponseFuture.add quote do:
`resolveResponseFuture`(`msgSender`, `msgId`, addr(`receivedMsg`), -1)
if n.body.kind != nnkEmpty:
# implement the receiving thunk proc that deserialzed the
# message parameters and calls the user proc:
userHandlerProc = n.copyNimTree
userHandlerProc.name = genSym(nskProc, msgName)
userHandlerProc.addPragma newIdentNode"async"
# This is the call to the user supplied handled. Here we add only the
# initial peer param, while the rest of the params will be added later.
userHandlerCall = newCall(userHandlerProc.name, msgSender)
# When there is a user handler, it must be awaited in the thunk proc.
# Above, by default `awaitUserHandler` is set to a no-op statement list.
awaitUserHandler = newCall("await", userHandlerCall)
msgThunksAndRegistrations.add(userHandlerProc)
# Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar)
if stateType != nil:
var localStateAccessor = quote:
template state(p: `Peer`): ref `stateType` =
cast[ref `stateType`](p.getState(`protocol`))
userHandlerProc.body.insert 0, localStateAccessor
if networkStateType != nil:
var networkStateAccessor = quote:
template networkState(p: `Peer`): ref `networkStateType` =
cast[ref `networkStateType`](p.getNetworkState(`protocol`))
userHandlerProc.body.insert 0, networkStateAccessor
for param, paramType in n.typedParams(skip = 1):
inc paramCount
# This is a fragment of the sending proc that
# serializes each of the passed parameters:
appendParams.add quote do:
`append`(`rlpWriter`, `param`)
# Each message has a corresponding record type.
# Here, we create its fields one by one:
msgRecordFields.add newTree(nnkIdentDefs,
param, chooseFieldType(paramType), newEmptyNode())
# The received RLP data is deserialized to a local variable of
# the message-specific type. This is done field by field here:
readParams.add quote do:
`receivedMsg`.`param` = `read`(`receivedRlp`, `paramType`)
# If there is user message handler, we'll place a call to it by
# unpacking the fields of the received message:
if userHandlerCall != nil:
userHandlerCall.add newDotExpr(receivedMsg, param)
let thunkName = newIdentNode(msgName & "_thunk")
msgThunksAndRegistrations.add quote do:
proc `thunkName`(`msgSender`: `Peer`, data: Rlp) {.async.} =
var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecord`
`readParams`
`awaitUserHandler`
`callResolvedResponseFuture`
finalOutput.add quote do:
# This is a type featuring a single field for each message param:
type `msgRecord`* = `msgRecordBody`
# Add a helper template for accessing the message type:
# e.g. p2p.hello:
template `msgIdent`*(T: type `protoNameIdent`): typedesc = `msgRecord`
# Add a helper template for obtaining the message Id for
# a particular message type:
template msgId*(T: type `msgRecord`): int = `msgId`
var msgSendProc = n
# TODO: check that the first param has the correct type
msgSendProc.params[1][0] = msgRecipient
# We change the return type of the proc to a Future.
# If this is a request proc, the future will return the response record.
let rt = if msgKind == rlpxRequest: responseRecord
else: newIdentNode("int")
msgSendProc.params[0] = newTree(nnkBracketExpr, newIdentNode("Future"), rt)
let writeMsgId = if isSubprotocol:
quote: `writeMsgId`(`protocol`, `msgId`, `msgRecipient`, `rlpWriter`)
else:
quote: `append`(`rlpWriter`, `msgId`)
let sendProc = if msgKind == rlpxRequest: sendRequest else: sendMsg
var sendCall = newCall(sendProc, msgRecipient, newCall(finish, rlpWriter))
if msgKind == rlpxRequest:
sendCall.add(responseRecord)
# let paramCountNode = newLit(paramCount)
msgSendProc.body = quote do:
var `rlpWriter` = `initRlpWriter`()
`writeMsgId`
`startList`(`rlpWriter`, `paramCount`)
`appendParams`
return `sendCall`
finalOutput.add msgSendProc
msgThunksAndRegistrations.add newCall(bindSym("registerMsg"),
protocol,
newIntLitNode(msgId),
newStrLitNode($n.name),
thunkName)
result = finalOutput
result.add quote do: result.add quote do:
# One global variable per protocol holds the protocol run-time data # One global variable per protocol holds the protocol run-time data
var `protocol` = `newProtocol`(`protoName`, `version`) var `protocol` = `newProtocol`(`protoName`, `version`)
@ -338,135 +564,76 @@ macro rlpxProtocol*(protoIdentifier: untyped,
for n in body: for n in body:
case n.kind case n.kind
of {nnkCall, nnkCommand}: of {nnkCall, nnkCommand}:
if n.len == 2 and eqIdent(n[0], "nextID"): if eqIdent(n[0], "nextID"):
if n[1].kind == nnkIntLit: # By default message IDs are assigned in increasing order
nextId = n[1].intVal # `nextID` can be used to skip some of the numeric slots
if n.len == 2 and n[1].kind == nnkIntLit:
nextId = n[1].intVal.int
else: else:
error("nextID expects a single int value", n) error("nextID expects a single int value", n)
elif eqIdent(n[0], "requestResponse"):
# `requestResponse` can be given a block of 2 or more procs.
# The last one is considered to be a response message, while
# all preceeding ones are requests triggering the response.
# The system makes sure to automatically insert a hidden `reqID`
# parameter used to discriminate the individual messages.
block processReqResp:
if n.len == 2 and n[1].kind == nnkStmtList:
var procs = newSeq[NimNode](0)
for def in n[1]:
if def.kind == nnkProcDef:
procs.add(def)
if procs.len > 1:
let responseMsgId = nextId + procs.len - 1
let responseRecord = addMsgHandler(responseMsgId,
procs[^1],
msgKind = rlpxResponse)
for i in 0 .. procs.len - 2:
discard addMsgHandler(nextId + i, procs[i],
msgKind = rlpxRequest,
responseMsgId = responseMsgId,
responseRecord = responseRecord)
inc nextId, procs.len
# we got all the way to here, so everything is fine.
# break the block so it doesn't reach the error call below
break processReqResp
error("requestResponse expects a block with at least two proc definitions")
else: else:
error(repr(n) & " is not a recognized call in RLPx protocol definitions", n) error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
of nnkTypeSection:
if n.len == 1 and eqIdent(n[0][0], "State"): of nnkAsgn:
stateType = genSym(nskType, protoName & "State") if eqIdent(n[0], "useRequestIds"):
n[0][0] = stateType useRequestIds = $n[1] == "true"
result.add n
# Create a pseudo-field for the protocol State type (e.g. `p2p.State`)
result.add quote do:
template State*(P: type `protoNameIdent`): typedesc = `stateType`
else: else:
error("The only type that can be defined inside a RLPx protocol is the protocol's State type.") error(repr(n[0]) & " is not a recognized protocol option")
of nnkTypeSection:
result.add n
for typ in n:
if eqIdent(typ[0], "State"):
stateType = genSym(nskType, protoName & "State")
typ[0] = stateType
result.add quote do:
template State*(P: type `protoNameIdent`): typedesc =
`stateType`
elif eqIdent(typ[0], "NetworkState"):
networkStateType = genSym(nskType, protoName & "NetworkState")
typ[0] = networkStateType
result.add quote do:
template NetworkState*(P: type `protoNameIdent`): typedesc =
`networkStateType`
else:
error("The only type names allowed within a RLPx protocol definition are 'State' and 'NetworkState'")
of nnkProcDef: of nnkProcDef:
let discard addMsgHandler(nextId, n)
msgIdent = n.name
msgName = $n.name
var
thunkName = newNilLit()
rlpWriter = genSym(nskVar, "writer")
appendParams = newNimNode(nnkStmtList)
peer = genSym(nskParam, "peer")
if n.body.kind != nnkEmpty:
# implement the receiving thunk proc that deserialzed the
# message parameters and calls the user proc:
var
nCopy = n.copyNimTree
rlp = genSym(nskParam, "rlp")
connection = genSym(nskParam, "connection")
nCopy.name = genSym(nskProc, msgName)
var callUserProc = newCall(nCopy.name, connection)
var readParams = newNimNode(nnkStmtList)
for i in 2 ..< n.params.len: # we skip the return type and the
# first param of type Peer
let paramNodes = n.params[i]
let paramType = paramNodes[^2]
for j in 0 ..< paramNodes.len - 2:
var deserializedParam = genSym(nskLet)
readParams.add quote do:
let `deserializedParam` = `read`(`rlp`, `paramType`)
callUserProc.add deserializedParam
thunkName = newIdentNode(msgName & "_thunk")
var thunk = quote do:
proc `thunkName`(`connection`: `Peer`, `rlp`: var Rlp) =
`readParams`
`callUserProc`
if stateType != nil:
# Define a local accessor for the current protocol state
# inside each handler (e.g. peer.state.foo = bar)
var localStateAccessor = quote:
template state(connection: `Peer`): ref `stateType` =
cast[ref `stateType`](connection.getState(`protocol`))
nCopy.body.insert 0, localStateAccessor
msgThunksAndRegistrations.add(nCopy, thunk)
var
msgType = genSym(nskType, msgName & "Obj")
msgTypeFields = newTree(nnkRecList)
msgTypeBody = newTree(nnkObjectTy,
newEmptyNode(),
newEmptyNode(),
msgTypeFields)
var paramCount = 0
# implement sending proc
for param, paramType in n.typedParams(skip = 1):
inc paramCount
appendParams.add quote do:
`append`(`rlpWriter`, `param`)
msgTypeFields.add newTree(nnkIdentDefs,
param, chooseFieldType(paramType), newEmptyNode())
result.add quote do:
# This is a type featuring a single field for each message param:
type `msgType`* = `msgTypeBody`
# Add a helper template for accessing the message type:
# e.g. p2p.hello:
template `msgIdent`*(T: type `protoNameIdent`): typedesc = `msgType`
# Add a helper template for obtaining the message Id for
# a particular message type:
template msgId*(T: type `msgType`): int = `nextId`
# XXX TODO: check that the first param has the correct type
n.params[1][0] = peer
# echo n.params.treeRepr
n.params[0] = newTree(nnkBracketExpr,
newIdentNode("Future"), newIdentNode("void"))
let writeMsgId = if isSubprotocol:
quote: `writeMsgId`(`protocol`, `nextId`, `peer`, `rlpWriter`)
else:
quote: `append`(`rlpWriter`, `nextId`)
let paramCountNode = newLit(paramCount)
n.body = quote do:
var `rlpWriter` = `initRlpWriter`()
`writeMsgId`
`rlpWriter`.startList(`paramCountNode`)
`appendParams`
return `send`(`peer`, `finish`(`rlpWriter`))
result.add n
msgThunksAndRegistrations.add newCall(bindSym("registerMsg"),
protocol,
newIntLitNode(nextId),
newStrLitNode($n.name),
thunkName)
inc nextId inc nextId
else: else:
error("illegal syntax in a RLPx protocol definition", n) error("illegal syntax in a RLPx protocol definition", n)
@ -494,7 +661,7 @@ rlpxProtocol p2p, 0:
proc hello(peer: Peer, proc hello(peer: Peer,
version: uint, version: uint,
clientId: string, clientId: string,
capabilities: openarray[Capability], capabilities: seq[Capability],
listenPort: uint, listenPort: uint,
nodeId: array[RawPublicKeySize, byte]) = nodeId: array[RawPublicKeySize, byte]) =
# peer.id = nodeId # peer.id = nodeId
@ -573,7 +740,7 @@ proc rlpxConnect*(remote: Node, myKeys: KeyPair, listenPort: Port,
discard result.hello(baseProtocolVersion, clientId, rlpxCapabilities, discard result.hello(baseProtocolVersion, clientId, rlpxCapabilities,
uint(listenPort), myKeys.pubkey.getRaw()) uint(listenPort), myKeys.pubkey.getRaw())
var response = await result.nextMsg(p2p.hello, discardOthers = true) var response = await result.waitSingleMsg(p2p.hello)
if not validatePubKeyInHello(response, remote.node.pubKey): if not validatePubKeyInHello(response, remote.node.pubKey):
warn "Remote nodeId is not its public key" # XXX: Do we care? warn "Remote nodeId is not its public key" # XXX: Do we care?
@ -610,7 +777,7 @@ proc rlpxAccept*(transp: StreamTransport, myKeys: KeyPair,
initSecretState(handshake, authMsg, ^ackMsg, result) initSecretState(handshake, authMsg, ^ackMsg, result)
var response = await result.nextMsg(p2p.hello, discardOthers = true) var response = await result.waitSingleMsg(p2p.hello)
let listenPort = transp.localAddress().port let listenPort = transp.localAddress().port
discard result.hello(baseProtocolVersion, clientId, discard result.hello(baseProtocolVersion, clientId,
rlpxCapabilities, listenPort.uint, rlpxCapabilities, listenPort.uint,
@ -643,6 +810,8 @@ when isMainModule:
type State = object type State = object
messages: int messages: int
useRequestIds = false
proc foo(p: Peer, s: string, a, z: int) = proc foo(p: Peer, s: string, a, z: int) =
p.state.messages += 1 p.state.messages += 1
echo p.state(aaa).peerName echo p.state(aaa).peerName

View File

@ -23,6 +23,8 @@ type
body {.rlpInline.}: BlockBody body {.rlpInline.}: BlockBody
rlpxProtocol eth, 63: rlpxProtocol eth, 63:
useRequestIds = false
proc status(p: Peer, protocolVersion, networkId, td: P, proc status(p: Peer, protocolVersion, networkId, td: P,
bestHash, genesisHash: KeccakHash) = bestHash, genesisHash: KeccakHash) =
discard discard
@ -33,32 +35,36 @@ rlpxProtocol eth, 63:
proc transactions(p: Peer, transactions: openarray[Transaction]) = proc transactions(p: Peer, transactions: openarray[Transaction]) =
discard discard
proc getBlockHeaders(p: Peer, hash: BlocksRequest) = requestResponse:
discard proc getBlockHeaders(p: Peer, hash: BlocksRequest) =
discard
proc blockHeaders(p: Peer, hashes: openarray[BlockHeader]) = proc blockHeaders(p: Peer, hashes: openarray[BlockHeader]) =
discard discard
proc getBlockBodies(p: Peer, hashes: openarray[KeccakHash]) = requestResponse:
discard proc getBlockBodies(p: Peer, hashes: openarray[KeccakHash]) =
discard
proc blockBodies(p: Peer, blocks: openarray[BlockBody]) = proc blockBodies(p: Peer, blocks: openarray[BlockBody]) =
discard discard
proc newBlock(p: Peer, bh: NewBlockAnnounce, totalDificulty: P) = proc newBlock(p: Peer, bh: NewBlockAnnounce, totalDificulty: P) =
discard discard
nextID 13 nextID 13
proc getNodeData(p: Peer, hashes: openarray[KeccakHash]) = requestResponse:
discard proc getNodeData(p: Peer, hashes: openarray[KeccakHash]) =
discard
proc nodeData(p: Peer, data: openarray[Blob]) = proc nodeData(p: Peer, data: openarray[Blob]) =
discard discard
proc getReceipts(p: Peer, hashes: openarray[KeccakHash]) = requestResponse:
discard proc getReceipts(p: Peer, hashes: openarray[KeccakHash]) =
discard
proc receipts(p: Peer, receipts: openarray[Receipt]) = proc receipts(p: Peer, receipts: openarray[Receipt]) =
discard discard

View File

@ -56,62 +56,70 @@ rlpxProtocol les, 2:
values: openarray[KeyValuePair], announceType: uint) = values: openarray[KeyValuePair], announceType: uint) =
discard discard
proc getBlockHeaders(p: Peer, reqID, BV: uint, req: BlocksRequest) = requestResponse:
discard proc getBlockHeaders(p: Peer, BV: uint, req: BlocksRequest) =
discard
proc blockHeaders(p: Peer, reqID, BV: uint, blocks: openarray[BlockHeaders]) = proc blockHeaders(p: Peer, BV: uint, blocks: openarray[BlockHeaders]) =
discard discard
## On-damand data retrieval ## On-damand data retrieval
## ##
proc getBlockBodies(p: Peer, reqID: uint, blocks: openarray[KeccakHash]) = requestResponse:
discard proc getBlockBodies(p: Peer, blocks: openarray[KeccakHash]) =
discard
proc blockBodies(p: Peer, reqID, BV: uint, bodies: openarray[BlockBody]) = proc blockBodies(p: Peer, BV: uint, bodies: openarray[BlockBody]) =
discard discard
proc getReceipts(p: Peer, reqID: uint, hashes: openarray[KeccakHash]) = requestResponse:
discard proc getReceipts(p: Peer, hashes: openarray[KeccakHash]) =
discard
proc receipts(p: Peer, reqID, BV: uint, receipts: openarray[Receipt]) = proc receipts(p: Peer, BV: uint, receipts: openarray[Receipt]) =
discard discard
proc getProofs(p: Peer, reqID: uint, proofs: openarray[ProofRequest]) = requestResponse:
discard proc getProofs(p: Peer, proofs: openarray[ProofRequest]) =
discard
proc proofs(p: Peer, reqID, BV: uint, proofs: openarray[Blob]) = proc proofs(p: Peer, BV: uint, proofs: openarray[Blob]) =
discard discard
proc getContractCodes(p: Peer, reqID: uint, requests: seq[ContractCodeRequest]) = requestResponse:
discard proc getContractCodes(p: Peer, requests: seq[ContractCodeRequest]) =
discard
proc contractCodes(p: Peer, reqID, BV: uint, results: seq[Blob]) = proc contractCodes(p: Peer, BV: uint, results: seq[Blob]) =
discard discard
nextID 15 nextID 15
proc getHeaderProofs(p: Peer, reqID: uint, requests: openarray[HeaderProofRequest]) = requestResponse:
discard proc getHeaderProofs(p: Peer, requests: openarray[HeaderProofRequest]) =
discard
proc headerProof(p: Peer, reqID, BV: uint, proofs: openarray[Blob]) = proc headerProof(p: Peer, BV: uint, proofs: openarray[Blob]) =
discard discard
proc getHelperTrieProofs(p: Peer, reqId: uint, requests: openarray[HelperTrieProofRequest]) = requestResponse:
discard proc getHelperTrieProofs(p: Peer, requests: openarray[HelperTrieProofRequest]) =
discard
proc helperTrieProof(p: Peer, reqId, BV: uint, nodes: seq[Blob], auxData: seq[Blob]) = proc helperTrieProof(p: Peer, BV: uint, nodes: seq[Blob], auxData: seq[Blob]) =
discard discard
## Transaction relaying and status retrieval ## Transaction relaying and status retrieval
## ##
proc sendTxV2(p: Peer, reqId: uint, transactions: openarray[Transaction]) = requestResponse:
discard proc sendTxV2(p: Peer, transactions: openarray[Transaction]) =
discard
proc getTxStatus(p: Peer, reqId: uint, transactions: openarray[Transaction]) = proc getTxStatus(p: Peer, transactions: openarray[Transaction]) =
discard discard
proc txStatus(p: Peer, reqId, BV: uint, transactions: openarray[TransactionStatusMsg]) = proc txStatus(p: Peer, BV: uint, transactions: openarray[TransactionStatusMsg]) =
discard discard