Document the RLPx public APIs

This commit is contained in:
Zahary Karadjov 2018-04-13 15:59:08 +03:00
parent 6ee66c3d00
commit 8c79997672
4 changed files with 334 additions and 76 deletions

125
README.md
View File

@ -1,6 +1,129 @@
# nim-eth-p2p [![Build Status](https://travis-ci.org/status-im/nim-eth-p2p.svg?branch=master)](https://travis-ci.org/status-im/nim-eth-p2p) [![Build status](https://ci.appveyor.com/api/projects/status/i4txsa2pdyaahmn0/branch/master?svg=true)](https://ci.appveyor.com/project/cheatfate/nim-eth-p2p/branch/master)
Nim Ethereum P2P protocol implementation
[[Nim]] Ethereum P2P protocol implementation
## RLPx
[RLPx](https://github.com/ethereum/devp2p/blob/master/rlpx.md) is the
high-level protocol for exchanging messages between peers in the Ethereum
network. Most of the client code of this library should not be concerned
with the implementation details of the underlying protocols and should use
the high-level APIs described in this section.
To obtain a RLPx connection, use the proc `rlpxConnect` supplying the
id of another node in the network. On success, the proc will return a
`Peer` object representing the connection. Each of the RLPx sub-protocols
consists of a set of strongly-typed messages, which are represented by
this library as regular Nim procs that can be executed over the `Peer`
object (more on this later).
### Defining RLPx sub-protocols
The sub-protocols are defined with the `rlpxProtocol` macro. It will accept
a 3-letter identifier for the protocol and the current protocol version:
Here is how the [DevP2P wire protocol](https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol) might look like:
``` nim
rlpxProtocol p2p, 0:
proc hello(peer: Peer,
version: uint,
clientId: string,
capabilities: openarray[Capability],
listenPort: uint,
nodeId: P2PNodeId) =
peer.id = nodeId
peer.dispatcher = getDispatcher(capabilities)
proc disconnect(peer: Peer, reason: DisconnectionReason)
proc ping(peer: Peer)
proc pong(peer: Peer) =
echo "received pong from ", peer.id
```
#### Sending messages
To send a particular message to a particular peer, just call the
corresponding proc over the `Peer` object:
``` nim
peer.hello(4, "Nimbus 1.0", ...)
peer.ping()
```
#### Receiving messages
Once a connection is established, incoming messages in RLPx may appear in
arbitrary order, because the sub-protocols may be multiplexed over a single
underlying connection. For this reason, the library assumes that the incoming
messages will be dispatched automatically to their corresponding handlers,
appearing in the protocol definition. The protocol implementations are expected
to maintain a state and to act like a state machine handling the incoming messages.
To achieve this, each protocol may define a `State` object that can be accessed as
a `state` field of the `Peer` object:
``` nim
rlpxProtocol abc, 1:
type State = object
receivedMsgsCount: int
proc incomingMessage(p: Peer) =
p.state.receivedMsgsCount += 1
```
Sometimes, you'll need to access the state of another protocol. To do this,
specify the protocol identifier to the `state` accessor:
``` nim
echo "ABC protocol messages: ", peer.state(abc).receivedMsgCount
```
While the state machine approach is the recommended way of implementing
sub-protocols, sometimes in imperative code it may be easier to wait for
a particular response message after sending a certain request.
This is enabled by the helper proc `nextMsg`:
``` nim
proc handshakeExample(peer: Peer) {.async.} =
...
# send a hello message
peer.hello(...)
# wait for a matching hello response
let response = await peer.nextMsg(p2p.hello)
echo response.clientId # print the name of the Ethereum client
# used by the other peer (Geth, Parity, Nimbus, etc)
```
There are few things to note in the above example:
1. The `rlpxProtocol` definition created a pseudo-variable named after the
protocol holding various properties of the protocol.
2. Each message defined in the protocol received a corresponding type name,
matching the message name (e.g. `p2p.hello`). This type will have fields
matching the parameter names of the message. If the messages has `openarray`
params, these will be remapped to `seq` types.
By default, `nextMsg` will still automatically dispatch all messages different
from the awaited one, but you can prevent this behavior by specifying the extra
flag `discardOthers = true`.
### Checking the other peer's supported sub-protocols
Upon establishing a connection, RLPx will automatically negotiate the list of
mutually supported protocols by the peers. To check whether a particular peer
supports a particular sub-protocol, use the following code:
``` nim
if peer.supports(les): # `les` is the identifier of the light clients sub-protocol
peer.getReceipts(nextReqId(), neededReceipts())
```
## License

View File

@ -1,19 +1,30 @@
import
macros, sets, algorithm, async, asyncnet, hashes, rlp, ecc,
ethereum_types, kademlia, discovery, auth
macros, sets, algorithm, async, asyncnet, asyncfutures,
hashes, rlp, ranges/ptr_arith, eth_keys, ethereum_types,
kademlia, discovery, auth
type
P2PNodeId = MDigest[512]
ConnectionState = enum
None,
Connected,
Disconnecting,
Disconnected
Peer* = ref object
id: NodeId # XXX: not fillet yed
id: P2PNodeId # XXX: not fillet yed
socket: AsyncSocket
dispatcher: Dispatcher
# privKey: AesKey
networkId: int
sessionSecrets: ConnectionSecret
connectionState: ConnectionState
protocolStates: seq[RootRef]
MessageHandler* = proc(x: Peer, data: var Rlp)
MessageDesc* = object
MessageInfo* = object
id*: int
name*: string
thunk*: MessageHandler
@ -24,10 +35,10 @@ type
name*: CapabilityName
version*: int
Protocol* = ref object
ProtocolInfo* = ref object
name*: CapabilityName
version*: int
messages*: seq[MessageDesc]
messages*: seq[MessageInfo]
index: int # the position of the protocol in the
# ordered list of supported protocols
@ -64,9 +75,10 @@ const
maxUInt24 = (not uint32(0)) shl 8
var
gProtocols = newSeq[Protocol](0)
gProtocols = newSeq[ProtocolInfo](0)
gCapabilities = newSeq[Capability](0)
gDispatchers = initSet[Dispatcher]()
devp2p: Protocol
devp2p: ProtocolInfo
# Dispatcher
#
@ -90,7 +102,7 @@ proc describeProtocols(d: Dispatcher): string =
if result.len != 0: result.add(',')
for c in gProtocols[i].name: result.add(c)
proc getDispatcher(otherPeerCapabilities: var openarray[Capability]): Dispatcher =
proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher =
# XXX: sub-optimal solution until progress is made here:
# https://github.com/nim-lang/Nim/issues/7457
# We should be able to find an existing dispatcher without allocating a new one
@ -130,10 +142,10 @@ proc getDispatcher(otherPeerCapabilities: var openarray[Capability]): Dispatcher
gDispatchers.incl result
# Protocol
# Protocol info objects
#
proc newProtocol(name: string, version: int): Protocol =
proc newProtocol(name: string, version: int): ProtocolInfo =
new result
result.name[0] = name[0]
result.name[1] = name[1]
@ -141,24 +153,26 @@ proc newProtocol(name: string, version: int): Protocol =
result.version = version
result.messages = @[]
proc nameStr*(p: Protocol): string =
proc nameStr*(p: ProtocolInfo): string =
result = newStringOfCap(3)
for c in p.name: result.add(c)
proc cmp*(lhs, rhs: Protocol): int {.inline.} =
proc cmp*(lhs, rhs: ProtocolInfo): int {.inline.} =
for i in 0..2:
if lhs.name[i] != rhs.name[i]:
return int16(lhs.name[i]) - int16(rhs.name[i])
return 0
proc registerMessage(protocol: var Protocol,
id: int, name: string, thunk: MessageHandler) =
protocol.messages.add MessageDesc(id: id, name: name, thunk: thunk)
proc registerMsg(protocol: var ProtocolInfo,
id: int, name: string, thunk: MessageHandler) =
protocol.messages.add MessageInfo(id: id, name: name, thunk: thunk)
proc registerProtocol(protocol: Protocol) =
proc registerProtocol(protocol: ProtocolInfo) =
# XXX: This can be done at compile-time in the future
if protocol.version > 0:
gProtocols.insert(protocol, lowerBound(gProtocols, protocol))
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
gCapabilities.insert(Capability(name: protocol.name, version: protocol.version), pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
else:
@ -173,34 +187,37 @@ proc append*(rlpWriter: var RlpWriter, hash: KeccakHash) =
proc read*(rlp: var Rlp, T: typedesc[KeccakHash]): T =
result.data = rlp.read(type(result.data))
proc append*(rlpWriter: var RlpWriter, p: Protocol) =
append(rlpWriter, (p.nameStr, p.version))
proc read*(rlp: var Rlp, T: type Protocol): Protocol =
let cap = rlp.read(Capability)
for p in gProtocols:
if p.name == cap.name and p.version == cap.version:
return p
# XXX: This shouldn't return nil probably, but rather
# an empty Protocol object
return nil
# Message composition and encryption
#
proc writeMessageId(p: Protocol, msgId: int, peer: Peer, rlpOut: var RlpWriter) =
proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter) =
let baseMsgId = peer.dispatcher.protocolOffsets[p.index]
if baseMsgId == -1:
raise newException(UnsupportedProtocol,
p.nameStr & " is not supported by peer " & $peer.id)
rlpOut.append(baseMsgId + msgId)
proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) =
template invalidIdError: untyped =
raise newException(ValueError,
"RLPx message with an invalid id " & $msgId &
" on a connection supporting " & peer.dispatcher.describeProtocols)
if msgId >= peer.dispatcher.thunks.len: invalidIdError()
let thunk = peer.dispatcher.thunks[msgId]
if thunk == nil: invalidIdError()
thunk(peer, msgData)
proc updateMac(mac: var openarray[byte], key: openarray[byte], bytes: openarray[byte]) =
# XXX TODO: implement this
discard
type
RlpxHeader = array[32, byte]
proc send(p: Peer, data: BytesRange) =
var header: array[32, byte]
var header: RlpxHeader
if data.len > int(maxUInt24):
raise newException(OverflowError, "RLPx message size exceeds limit")
@ -242,24 +259,47 @@ proc send(p: Peer, data: BytesRange) =
return header_ciphertext + header_mac + frame_ciphertext + frame_mac
"""
proc dispatchMessage(connection: Peer, msg: BytesRange) =
# This proc dispatches an already decrypted message
proc getMsgLen(header: RlpxHeader): int =
32
var rlp = rlpFromBytes(msg)
proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} =
# XXX: This should be a library function
var receivedBytes = 0
while receivedBytes < bufferLen:
receivedBytes += await s.recvInto(buffer.shift(receivedBytes),
bufferLen - receivedBytes)
proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
## This procs awaits the next complete RLPx message in the TCP stream
var header: RlpxHeader
await peer.socket.fullRecvInto(header.baseAddr, sizeof(header))
let msgLen = header.getMsgLen
var msgData = newSeq[byte](msgLen).toRange
await peer.socket.fullRecvInto(msgData.baseAddr, msgData.len)
var rlp = rlpFromBytes(msgData)
let msgId = rlp.read(int)
return (msgId, rlp)
template invalidIdError: untyped =
raise newException(ValueError,
"RLPx message with an invalid id " & $msgId &
" on a connection supporting " & connection.dispatcher.describeProtocols)
proc nextMsg*(peer: Peer, MsgType: typedesc,
discardOthers = false): Future[MsgType] {.async.} =
## This procs awaits a specific RLPx message.
## By default, other messages will be automatically dispatched
## to their responsive handlers unless `discardOthers` is set to
## true. This may be useful when the protocol requires a very
## specific response to a given request. Use with caution.
const wantedId = MsgType.msgId
if msgId >= connection.dispatcher.thunks.len: invalidIdError()
let thunk = connection.dispatcher.thunks[msgId]
if thunk == nil: invalidIdError()
while true:
var (nextMsgId, nextMsgData) = await peer.recvMsg()
if nextMsgId == wantedId:
return nextMsgData.read(MsgType)
elif not discardOthers:
peer.dispatchMsg(nextMsgId, nextMsgData)
thunk(connection, rlp)
iterator typedParams(n: PNimrodNode, skip = 0): (PNimrodNode, PNimrodNode) =
iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) =
for i in (1 + skip) ..< n.params.len:
let paramNodes = n.params[i]
let paramType = paramNodes[^2]
@ -267,12 +307,36 @@ iterator typedParams(n: PNimrodNode, skip = 0): (PNimrodNode, PNimrodNode) =
for j in 0 .. < (paramNodes.len-2):
yield (paramNodes[j], paramType)
macro rlpxProtocol*(name: static[string],
proc chooseFieldType(n: NimNode): NimNode =
## Examines the parameter types used in the message signature
## and selects the corresponding field type for use in the
## message object type (i.e. `p2p.hello`).
##
## For now, only openarray types are remapped to sequences.
result = n
if n.kind == nnkBracketExpr and
n[0].kind == nnkIdent and
$n[0].ident == "openarray":
result = n.copyNimTree
result[0] = newIdentNode("seq")
proc getState(peer: Peer, proto: ProtocolInfo): RootRef =
peer.protocolStates[proto.index]
template state*(connection: Peer, Protocol: typedesc): untyped =
## Returns the state object of a particular protocol for a
## particular connection.
cast[ref Protocol.State](connection.getState(Protocol.info))
macro rlpxProtocol*(protoIdentifier: untyped,
version: static[int],
body: untyped): untyped =
## The macro used to defined RLPx sub-protocols. See README.
var
protoName = $protoIdentifier
protoNameIdent = newIdentNode(protoName)
nextId = BiggestInt 0
protocol = genSym(nskVar)
protocol = genSym(nskVar, protoName & "Proto")
newProtocol = bindSym "newProtocol"
rlpFromBytes = bindSym "rlpFromBytes"
read = bindSym "read"
@ -281,12 +345,23 @@ macro rlpxProtocol*(name: static[string],
append = bindSym "append"
send = bindSym "send"
Peer = bindSym "Peer"
writeMessageId = bindSym "writeMessageId"
writeMsgId = bindSym "writeMsgId"
isSubprotocol = version > 0
stateType: NimNode = nil
# By convention, all Ethereum protocol names must be abbreviated to 3 letters
assert protoName.len == 3
result = newNimNode(nnkStmtList)
result.add quote do:
var `protocol` = `newProtocol`(`name`, `version`)
# One global variable per protocol holds the protocol run-time data
var `protocol` = `newProtocol`(`protoName`, `version`)
# Create a type actining as a pseudo-object representing the protocol (e.g. p2p)
type `protoNameIdent`* = object
# The protocol run-time data is available as a pseudo-field (e.g. `p2p.info`)
template info*(P: type `protoNameIdent`): ProtocolInfo = `protocol`
for n in body:
case n.kind
@ -298,9 +373,22 @@ macro rlpxProtocol*(name: static[string],
error("nextID expects a single int value", n)
else:
error(repr(n) & " is not a recognized call in RLPx protocol definitions", n)
of nnkTypeSection:
if n.len == 1 and n[0][0].kind == nnkIdent and $n[0][0].ident == "State":
stateType = genSym(nskType, protoName & "State")
n[0][0] = stateType
result.add n
# Create a pseudo-field for the protocol State type (e.g. `p2p.State`)
result.add quote do:
template State*(P: type `protoNameIdent`): typedesc = `stateType`
else:
error("The only type that can be defined inside a RLPx protocol is the protocol's State type.")
of nnkProcDef:
inc nextId
let name = n.name.ident
let
msgIdent = n.name.ident
msgName = $msgIdent
var
thunkName = newNilLit()
@ -309,13 +397,14 @@ macro rlpxProtocol*(name: static[string],
peer = genSym(nskParam, "peer")
if n.body.kind != nnkEmpty:
# implement receiving thunk
# implement the receiving thunk proc that deserialzed the
# message parameters and calls the user proc:
var
nCopy = n.copyNimTree
rlp = genSym(nskParam, "rlp")
connection = genSym(nskParam, "connection")
nCopy.name = genSym(nskProc, $name)
nCopy.name = genSym(nskProc, msgName)
var callUserProc = newCall(nCopy.name, connection)
var readParams = newNimNode(nnkStmtList)
@ -333,24 +422,56 @@ macro rlpxProtocol*(name: static[string],
callUserProc.add deserializedParam
thunkName = newIdentNode($name & "_thunk")
thunkName = newIdentNode(msgName & "_thunk")
var thunk = quote do:
proc `thunkName`(`connection`: `Peer`, `rlp`: var Rlp) =
`readParams`
`callUserProc`
if stateType != nil:
# Define a local accessor for the current protocol state
# inside each handler (e.g. peer.state.foo = bar)
var localStateAccessor = quote:
template state(connection: `Peer`): ref `stateType` =
cast[ref `stateType`](connection.getState(`protocol`))
nCopy.body.insert 0, localStateAccessor
result.add nCopy, thunk
var
msgType = genSym(nskType, msgName & "Obj")
msgTypeFields = newTree(nnkRecList)
msgTypeBody = newTree(nnkObjectTy,
newEmptyNode(),
newEmptyNode(),
msgTypeFields)
# implement sending proc
for param, paramType in n.typedParams(skip = 1):
appendParams.add quote do:
`append`(`rlpWriter`, `param`)
msgTypeFields.add newTree(nnkIdentDefs,
param, chooseFieldType(paramType), newEmptyNode())
result.add quote do:
# This is a type featuring a single field for each message param:
type `msgType`* = `msgTypeBody`
# Add a helper template for accessing the message type:
# e.g. p2p.hello:
template `msgIdent`*(T: type `protoNameIdent`): typedesc = `msgType`
# Add a helper template for obtaining the message Id for
# a particular message type:
template msgId*(T: type `msgType`): int = `nextId`
# XXX TODO: check that the first param has the correct type
n.params[1][0] = peer
let writeMsgId = if isSubprotocol:
quote: `writeMessageId`(`protocol`, `nextId`, `peer`, `rlpWriter`)
quote: `writeMsgId`(`protocol`, `nextId`, `peer`, `rlpWriter`)
else:
quote: `append`(`rlpWriter`, `nextId`)
@ -361,7 +482,7 @@ macro rlpxProtocol*(name: static[string],
`send`(`peer`, `finish`(`rlpWriter`))
result.add n
result.add newCall(bindSym("registerMessage"),
result.add newCall(bindSym("registerMsg"),
protocol,
newIntLitNode(nextId),
newStrLitNode($n.name),
@ -371,7 +492,7 @@ macro rlpxProtocol*(name: static[string],
error("illegal syntax in a RLPx protocol definition", n)
result.add newCall(bindSym("registerProtocol"), protocol)
echo repr(result)
when isMainModule: echo repr(result)
type
DisconnectionReason* = enum
@ -389,16 +510,15 @@ type
MessageTimeout,
SubprotocolReason = 0x10
rlpxProtocol("p2p", 0):
rlpxProtocol p2p, 0:
proc hello(peer: Peer,
version: uint,
clientId: string,
capabilities: openarray[Protocol],
capabilities: openarray[Capability],
listenPort: uint,
nodeId: MDigest[512]
) =
discard
nodeId: P2PNodeId) =
peer.id = nodeId
peer.dispatcher = getDispatcher(capabilities)
proc disconnect(peer: Peer, reason: DisconnectionReason)
@ -407,6 +527,8 @@ rlpxProtocol("p2p", 0):
proc pong(peer: Peer) =
discard
import typetraits
proc rlpxConnect*(myKeys: KeyPair, remoteKey: PublicKey,
address: Address): Future[Peer] {.async.} =
# TODO: Make sure to close the socket in case of exception
@ -438,29 +560,42 @@ proc rlpxConnect*(myKeys: KeyPair, remoteKey: PublicKey,
var ackMsg: array[AckMessageMaxEIP8, byte]
let ackMsgLen = handshake.ackSize(encrypt = encryptionEnabled)
let receivedBytes = await result.socket.recvInto(addr ackMsg, ackMsgLen)
if receivedBytes != ackMsgLen:
# XXX: this handling is not perfect, we should probably retry until the
# correct number of bytes are read!
raise newException(MalformedMessageError, "AuthAck message has incorrect size")
await result.socket.fullRecvInto(addr ackMsg, ackMsgLen)
check handshake.decodeAckMessage(^ackMsg)
check handshake.getSecrets(^authMsg, ^ackMsg, result.sessionSecrets)
var
# XXX: TODO
nodeId: MDigest[512]
# XXX: TODO: get these from somewhere
nodeId: P2PNodeId
listeningPort = uint 0
hello(result, baseProtocolVersion, clienId, gProtocols, listeningPort, nodeId)
hello(result, baseProtocolVersion, clienId, gCapabilities, listeningPort, nodeId)
var response = await result.nextMsg(p2p.hello, discardOthers = true)
result.dispatcher = getDispatcher(response.capabilities)
result.id = response.nodeId
result.connectionState = Connected
newSeq(result.protocolStates, gProtocols.len)
# XXX: initialize the sub-protocol states
when isMainModule:
import rlp
rlpxProtocol("test", 1):
rlpxProtocol aaa, 1:
type State = object
peerName: string
proc hi(p: Peer, name: string) =
p.state.peerName = name
rlpxProtocol bbb, 1:
type State = object
messages: int
proc foo(p: Peer, s: string, a, z: int) =
echo s
p.state.messages += 1
echo p.state(aaa).peerName
proc bar(p: Peer, i: int, s: string)

View File

@ -12,7 +12,7 @@ type
header: BlockHeader
body {.rlpInline.}: BlockBody
rlpxProtocol("eth", 63):
rlpxProtocol eth, 63:
proc status(p: Peer, protocolVersion, networkId, td: P,
bestHash, genesisHash: KeccakHash) =
discard

View File

@ -30,7 +30,7 @@ type
status*: TransactionStatus
data*: Blob
rlpxProtocol("les", 2):
rlpxProtocol les, 2:
## Handshake
##