diff --git a/README.md b/README.md index e2584de..96b12d7 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,129 @@ # nim-eth-p2p [![Build Status](https://travis-ci.org/status-im/nim-eth-p2p.svg?branch=master)](https://travis-ci.org/status-im/nim-eth-p2p) [![Build status](https://ci.appveyor.com/api/projects/status/i4txsa2pdyaahmn0/branch/master?svg=true)](https://ci.appveyor.com/project/cheatfate/nim-eth-p2p/branch/master) -Nim Ethereum P2P protocol implementation +[[Nim]] Ethereum P2P protocol implementation + +## RLPx + +[RLPx](https://github.com/ethereum/devp2p/blob/master/rlpx.md) is the +high-level protocol for exchanging messages between peers in the Ethereum +network. Most of the client code of this library should not be concerned +with the implementation details of the underlying protocols and should use +the high-level APIs described in this section. + +To obtain a RLPx connection, use the proc `rlpxConnect` supplying the +id of another node in the network. On success, the proc will return a +`Peer` object representing the connection. Each of the RLPx sub-protocols +consists of a set of strongly-typed messages, which are represented by +this library as regular Nim procs that can be executed over the `Peer` +object (more on this later). + +### Defining RLPx sub-protocols + +The sub-protocols are defined with the `rlpxProtocol` macro. It will accept +a 3-letter identifier for the protocol and the current protocol version: + +Here is how the [DevP2P wire protocol](https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol) might look like: + +``` nim +rlpxProtocol p2p, 0: + proc hello(peer: Peer, + version: uint, + clientId: string, + capabilities: openarray[Capability], + listenPort: uint, + nodeId: P2PNodeId) = + peer.id = nodeId + peer.dispatcher = getDispatcher(capabilities) + + proc disconnect(peer: Peer, reason: DisconnectionReason) + + proc ping(peer: Peer) + + proc pong(peer: Peer) = + echo "received pong from ", peer.id +``` + +#### Sending messages + +To send a particular message to a particular peer, just call the +corresponding proc over the `Peer` object: + +``` nim +peer.hello(4, "Nimbus 1.0", ...) +peer.ping() +``` + +#### Receiving messages + +Once a connection is established, incoming messages in RLPx may appear in +arbitrary order, because the sub-protocols may be multiplexed over a single +underlying connection. For this reason, the library assumes that the incoming +messages will be dispatched automatically to their corresponding handlers, +appearing in the protocol definition. The protocol implementations are expected +to maintain a state and to act like a state machine handling the incoming messages. +To achieve this, each protocol may define a `State` object that can be accessed as +a `state` field of the `Peer` object: + +``` nim +rlpxProtocol abc, 1: + type State = object + receivedMsgsCount: int + + proc incomingMessage(p: Peer) = + p.state.receivedMsgsCount += 1 + +``` + +Sometimes, you'll need to access the state of another protocol. To do this, +specify the protocol identifier to the `state` accessor: + +``` nim + echo "ABC protocol messages: ", peer.state(abc).receivedMsgCount +``` + +While the state machine approach is the recommended way of implementing +sub-protocols, sometimes in imperative code it may be easier to wait for +a particular response message after sending a certain request. + +This is enabled by the helper proc `nextMsg`: + +``` nim +proc handshakeExample(peer: Peer) {.async.} = + ... + # send a hello message + peer.hello(...) + + # wait for a matching hello response + let response = await peer.nextMsg(p2p.hello) + echo response.clientId # print the name of the Ethereum client + # used by the other peer (Geth, Parity, Nimbus, etc) +``` + +There are few things to note in the above example: + +1. The `rlpxProtocol` definition created a pseudo-variable named after the + protocol holding various properties of the protocol. + +2. Each message defined in the protocol received a corresponding type name, + matching the message name (e.g. `p2p.hello`). This type will have fields + matching the parameter names of the message. If the messages has `openarray` + params, these will be remapped to `seq` types. + +By default, `nextMsg` will still automatically dispatch all messages different +from the awaited one, but you can prevent this behavior by specifying the extra +flag `discardOthers = true`. + +### Checking the other peer's supported sub-protocols + +Upon establishing a connection, RLPx will automatically negotiate the list of +mutually supported protocols by the peers. To check whether a particular peer +supports a particular sub-protocol, use the following code: + +``` nim +if peer.supports(les): # `les` is the identifier of the light clients sub-protocol + peer.getReceipts(nextReqId(), neededReceipts()) + +``` ## License diff --git a/ethp2p/rlpx.nim b/ethp2p/rlpx.nim index dbb0320..81141c4 100644 --- a/ethp2p/rlpx.nim +++ b/ethp2p/rlpx.nim @@ -1,19 +1,30 @@ import - macros, sets, algorithm, async, asyncnet, hashes, rlp, ecc, - ethereum_types, kademlia, discovery, auth + macros, sets, algorithm, async, asyncnet, asyncfutures, + hashes, rlp, ranges/ptr_arith, eth_keys, ethereum_types, + kademlia, discovery, auth type + P2PNodeId = MDigest[512] + + ConnectionState = enum + None, + Connected, + Disconnecting, + Disconnected + Peer* = ref object - id: NodeId # XXX: not fillet yed + id: P2PNodeId # XXX: not fillet yed socket: AsyncSocket dispatcher: Dispatcher # privKey: AesKey networkId: int sessionSecrets: ConnectionSecret + connectionState: ConnectionState + protocolStates: seq[RootRef] MessageHandler* = proc(x: Peer, data: var Rlp) - MessageDesc* = object + MessageInfo* = object id*: int name*: string thunk*: MessageHandler @@ -24,10 +35,10 @@ type name*: CapabilityName version*: int - Protocol* = ref object + ProtocolInfo* = ref object name*: CapabilityName version*: int - messages*: seq[MessageDesc] + messages*: seq[MessageInfo] index: int # the position of the protocol in the # ordered list of supported protocols @@ -64,9 +75,10 @@ const maxUInt24 = (not uint32(0)) shl 8 var - gProtocols = newSeq[Protocol](0) + gProtocols = newSeq[ProtocolInfo](0) + gCapabilities = newSeq[Capability](0) gDispatchers = initSet[Dispatcher]() - devp2p: Protocol + devp2p: ProtocolInfo # Dispatcher # @@ -90,7 +102,7 @@ proc describeProtocols(d: Dispatcher): string = if result.len != 0: result.add(',') for c in gProtocols[i].name: result.add(c) -proc getDispatcher(otherPeerCapabilities: var openarray[Capability]): Dispatcher = +proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher = # XXX: sub-optimal solution until progress is made here: # https://github.com/nim-lang/Nim/issues/7457 # We should be able to find an existing dispatcher without allocating a new one @@ -130,10 +142,10 @@ proc getDispatcher(otherPeerCapabilities: var openarray[Capability]): Dispatcher gDispatchers.incl result -# Protocol +# Protocol info objects # -proc newProtocol(name: string, version: int): Protocol = +proc newProtocol(name: string, version: int): ProtocolInfo = new result result.name[0] = name[0] result.name[1] = name[1] @@ -141,24 +153,26 @@ proc newProtocol(name: string, version: int): Protocol = result.version = version result.messages = @[] -proc nameStr*(p: Protocol): string = +proc nameStr*(p: ProtocolInfo): string = result = newStringOfCap(3) for c in p.name: result.add(c) -proc cmp*(lhs, rhs: Protocol): int {.inline.} = +proc cmp*(lhs, rhs: ProtocolInfo): int {.inline.} = for i in 0..2: if lhs.name[i] != rhs.name[i]: return int16(lhs.name[i]) - int16(rhs.name[i]) return 0 -proc registerMessage(protocol: var Protocol, - id: int, name: string, thunk: MessageHandler) = - protocol.messages.add MessageDesc(id: id, name: name, thunk: thunk) +proc registerMsg(protocol: var ProtocolInfo, + id: int, name: string, thunk: MessageHandler) = + protocol.messages.add MessageInfo(id: id, name: name, thunk: thunk) -proc registerProtocol(protocol: Protocol) = +proc registerProtocol(protocol: ProtocolInfo) = # XXX: This can be done at compile-time in the future if protocol.version > 0: - gProtocols.insert(protocol, lowerBound(gProtocols, protocol)) + let pos = lowerBound(gProtocols, protocol) + gProtocols.insert(protocol, pos) + gCapabilities.insert(Capability(name: protocol.name, version: protocol.version), pos) for i in 0 ..< gProtocols.len: gProtocols[i].index = i else: @@ -173,34 +187,37 @@ proc append*(rlpWriter: var RlpWriter, hash: KeccakHash) = proc read*(rlp: var Rlp, T: typedesc[KeccakHash]): T = result.data = rlp.read(type(result.data)) -proc append*(rlpWriter: var RlpWriter, p: Protocol) = - append(rlpWriter, (p.nameStr, p.version)) - -proc read*(rlp: var Rlp, T: type Protocol): Protocol = - let cap = rlp.read(Capability) - for p in gProtocols: - if p.name == cap.name and p.version == cap.version: - return p - # XXX: This shouldn't return nil probably, but rather - # an empty Protocol object - return nil - # Message composition and encryption # -proc writeMessageId(p: Protocol, msgId: int, peer: Peer, rlpOut: var RlpWriter) = +proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter) = let baseMsgId = peer.dispatcher.protocolOffsets[p.index] if baseMsgId == -1: raise newException(UnsupportedProtocol, p.nameStr & " is not supported by peer " & $peer.id) rlpOut.append(baseMsgId + msgId) +proc dispatchMsg(peer: Peer, msgId: int, msgData: var Rlp) = + template invalidIdError: untyped = + raise newException(ValueError, + "RLPx message with an invalid id " & $msgId & + " on a connection supporting " & peer.dispatcher.describeProtocols) + + if msgId >= peer.dispatcher.thunks.len: invalidIdError() + let thunk = peer.dispatcher.thunks[msgId] + if thunk == nil: invalidIdError() + + thunk(peer, msgData) + proc updateMac(mac: var openarray[byte], key: openarray[byte], bytes: openarray[byte]) = # XXX TODO: implement this discard +type + RlpxHeader = array[32, byte] + proc send(p: Peer, data: BytesRange) = - var header: array[32, byte] + var header: RlpxHeader if data.len > int(maxUInt24): raise newException(OverflowError, "RLPx message size exceeds limit") @@ -242,24 +259,47 @@ proc send(p: Peer, data: BytesRange) = return header_ciphertext + header_mac + frame_ciphertext + frame_mac """ -proc dispatchMessage(connection: Peer, msg: BytesRange) = - # This proc dispatches an already decrypted message +proc getMsgLen(header: RlpxHeader): int = + 32 - var rlp = rlpFromBytes(msg) +proc fullRecvInto(s: AsyncSocket, buffer: pointer, bufferLen: int) {.async.} = + # XXX: This should be a library function + var receivedBytes = 0 + while receivedBytes < bufferLen: + receivedBytes += await s.recvInto(buffer.shift(receivedBytes), + bufferLen - receivedBytes) + +proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = + ## This procs awaits the next complete RLPx message in the TCP stream + + var header: RlpxHeader + await peer.socket.fullRecvInto(header.baseAddr, sizeof(header)) + + let msgLen = header.getMsgLen + var msgData = newSeq[byte](msgLen).toRange + await peer.socket.fullRecvInto(msgData.baseAddr, msgData.len) + + var rlp = rlpFromBytes(msgData) let msgId = rlp.read(int) + return (msgId, rlp) - template invalidIdError: untyped = - raise newException(ValueError, - "RLPx message with an invalid id " & $msgId & - " on a connection supporting " & connection.dispatcher.describeProtocols) +proc nextMsg*(peer: Peer, MsgType: typedesc, + discardOthers = false): Future[MsgType] {.async.} = + ## This procs awaits a specific RLPx message. + ## By default, other messages will be automatically dispatched + ## to their responsive handlers unless `discardOthers` is set to + ## true. This may be useful when the protocol requires a very + ## specific response to a given request. Use with caution. + const wantedId = MsgType.msgId - if msgId >= connection.dispatcher.thunks.len: invalidIdError() - let thunk = connection.dispatcher.thunks[msgId] - if thunk == nil: invalidIdError() + while true: + var (nextMsgId, nextMsgData) = await peer.recvMsg() + if nextMsgId == wantedId: + return nextMsgData.read(MsgType) + elif not discardOthers: + peer.dispatchMsg(nextMsgId, nextMsgData) - thunk(connection, rlp) - -iterator typedParams(n: PNimrodNode, skip = 0): (PNimrodNode, PNimrodNode) = +iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) = for i in (1 + skip) ..< n.params.len: let paramNodes = n.params[i] let paramType = paramNodes[^2] @@ -267,12 +307,36 @@ iterator typedParams(n: PNimrodNode, skip = 0): (PNimrodNode, PNimrodNode) = for j in 0 .. < (paramNodes.len-2): yield (paramNodes[j], paramType) -macro rlpxProtocol*(name: static[string], +proc chooseFieldType(n: NimNode): NimNode = + ## Examines the parameter types used in the message signature + ## and selects the corresponding field type for use in the + ## message object type (i.e. `p2p.hello`). + ## + ## For now, only openarray types are remapped to sequences. + result = n + if n.kind == nnkBracketExpr and + n[0].kind == nnkIdent and + $n[0].ident == "openarray": + result = n.copyNimTree + result[0] = newIdentNode("seq") + +proc getState(peer: Peer, proto: ProtocolInfo): RootRef = + peer.protocolStates[proto.index] + +template state*(connection: Peer, Protocol: typedesc): untyped = + ## Returns the state object of a particular protocol for a + ## particular connection. + cast[ref Protocol.State](connection.getState(Protocol.info)) + +macro rlpxProtocol*(protoIdentifier: untyped, version: static[int], body: untyped): untyped = + ## The macro used to defined RLPx sub-protocols. See README. var + protoName = $protoIdentifier + protoNameIdent = newIdentNode(protoName) nextId = BiggestInt 0 - protocol = genSym(nskVar) + protocol = genSym(nskVar, protoName & "Proto") newProtocol = bindSym "newProtocol" rlpFromBytes = bindSym "rlpFromBytes" read = bindSym "read" @@ -281,12 +345,23 @@ macro rlpxProtocol*(name: static[string], append = bindSym "append" send = bindSym "send" Peer = bindSym "Peer" - writeMessageId = bindSym "writeMessageId" + writeMsgId = bindSym "writeMsgId" isSubprotocol = version > 0 + stateType: NimNode = nil + + # By convention, all Ethereum protocol names must be abbreviated to 3 letters + assert protoName.len == 3 result = newNimNode(nnkStmtList) result.add quote do: - var `protocol` = `newProtocol`(`name`, `version`) + # One global variable per protocol holds the protocol run-time data + var `protocol` = `newProtocol`(`protoName`, `version`) + + # Create a type actining as a pseudo-object representing the protocol (e.g. p2p) + type `protoNameIdent`* = object + + # The protocol run-time data is available as a pseudo-field (e.g. `p2p.info`) + template info*(P: type `protoNameIdent`): ProtocolInfo = `protocol` for n in body: case n.kind @@ -298,9 +373,22 @@ macro rlpxProtocol*(name: static[string], error("nextID expects a single int value", n) else: error(repr(n) & " is not a recognized call in RLPx protocol definitions", n) + of nnkTypeSection: + if n.len == 1 and n[0][0].kind == nnkIdent and $n[0][0].ident == "State": + stateType = genSym(nskType, protoName & "State") + n[0][0] = stateType + result.add n + # Create a pseudo-field for the protocol State type (e.g. `p2p.State`) + result.add quote do: + template State*(P: type `protoNameIdent`): typedesc = `stateType` + else: + error("The only type that can be defined inside a RLPx protocol is the protocol's State type.") + of nnkProcDef: inc nextId - let name = n.name.ident + let + msgIdent = n.name.ident + msgName = $msgIdent var thunkName = newNilLit() @@ -309,13 +397,14 @@ macro rlpxProtocol*(name: static[string], peer = genSym(nskParam, "peer") if n.body.kind != nnkEmpty: - # implement receiving thunk + # implement the receiving thunk proc that deserialzed the + # message parameters and calls the user proc: var nCopy = n.copyNimTree rlp = genSym(nskParam, "rlp") connection = genSym(nskParam, "connection") - nCopy.name = genSym(nskProc, $name) + nCopy.name = genSym(nskProc, msgName) var callUserProc = newCall(nCopy.name, connection) var readParams = newNimNode(nnkStmtList) @@ -333,24 +422,56 @@ macro rlpxProtocol*(name: static[string], callUserProc.add deserializedParam - thunkName = newIdentNode($name & "_thunk") + thunkName = newIdentNode(msgName & "_thunk") var thunk = quote do: proc `thunkName`(`connection`: `Peer`, `rlp`: var Rlp) = `readParams` `callUserProc` + if stateType != nil: + # Define a local accessor for the current protocol state + # inside each handler (e.g. peer.state.foo = bar) + var localStateAccessor = quote: + template state(connection: `Peer`): ref `stateType` = + cast[ref `stateType`](connection.getState(`protocol`)) + + nCopy.body.insert 0, localStateAccessor + result.add nCopy, thunk + var + msgType = genSym(nskType, msgName & "Obj") + msgTypeFields = newTree(nnkRecList) + msgTypeBody = newTree(nnkObjectTy, + newEmptyNode(), + newEmptyNode(), + msgTypeFields) + # implement sending proc for param, paramType in n.typedParams(skip = 1): appendParams.add quote do: `append`(`rlpWriter`, `param`) + msgTypeFields.add newTree(nnkIdentDefs, + param, chooseFieldType(paramType), newEmptyNode()) + + result.add quote do: + # This is a type featuring a single field for each message param: + type `msgType`* = `msgTypeBody` + + # Add a helper template for accessing the message type: + # e.g. p2p.hello: + template `msgIdent`*(T: type `protoNameIdent`): typedesc = `msgType` + + # Add a helper template for obtaining the message Id for + # a particular message type: + template msgId*(T: type `msgType`): int = `nextId` + # XXX TODO: check that the first param has the correct type n.params[1][0] = peer let writeMsgId = if isSubprotocol: - quote: `writeMessageId`(`protocol`, `nextId`, `peer`, `rlpWriter`) + quote: `writeMsgId`(`protocol`, `nextId`, `peer`, `rlpWriter`) else: quote: `append`(`rlpWriter`, `nextId`) @@ -361,7 +482,7 @@ macro rlpxProtocol*(name: static[string], `send`(`peer`, `finish`(`rlpWriter`)) result.add n - result.add newCall(bindSym("registerMessage"), + result.add newCall(bindSym("registerMsg"), protocol, newIntLitNode(nextId), newStrLitNode($n.name), @@ -371,7 +492,7 @@ macro rlpxProtocol*(name: static[string], error("illegal syntax in a RLPx protocol definition", n) result.add newCall(bindSym("registerProtocol"), protocol) - echo repr(result) + when isMainModule: echo repr(result) type DisconnectionReason* = enum @@ -389,16 +510,15 @@ type MessageTimeout, SubprotocolReason = 0x10 -rlpxProtocol("p2p", 0): - +rlpxProtocol p2p, 0: proc hello(peer: Peer, version: uint, clientId: string, - capabilities: openarray[Protocol], + capabilities: openarray[Capability], listenPort: uint, - nodeId: MDigest[512] - ) = - discard + nodeId: P2PNodeId) = + peer.id = nodeId + peer.dispatcher = getDispatcher(capabilities) proc disconnect(peer: Peer, reason: DisconnectionReason) @@ -407,6 +527,8 @@ rlpxProtocol("p2p", 0): proc pong(peer: Peer) = discard +import typetraits + proc rlpxConnect*(myKeys: KeyPair, remoteKey: PublicKey, address: Address): Future[Peer] {.async.} = # TODO: Make sure to close the socket in case of exception @@ -438,29 +560,42 @@ proc rlpxConnect*(myKeys: KeyPair, remoteKey: PublicKey, var ackMsg: array[AckMessageMaxEIP8, byte] let ackMsgLen = handshake.ackSize(encrypt = encryptionEnabled) - let receivedBytes = await result.socket.recvInto(addr ackMsg, ackMsgLen) - - if receivedBytes != ackMsgLen: - # XXX: this handling is not perfect, we should probably retry until the - # correct number of bytes are read! - raise newException(MalformedMessageError, "AuthAck message has incorrect size") + await result.socket.fullRecvInto(addr ackMsg, ackMsgLen) check handshake.decodeAckMessage(^ackMsg) check handshake.getSecrets(^authMsg, ^ackMsg, result.sessionSecrets) var - # XXX: TODO - nodeId: MDigest[512] + # XXX: TODO: get these from somewhere + nodeId: P2PNodeId listeningPort = uint 0 - hello(result, baseProtocolVersion, clienId, gProtocols, listeningPort, nodeId) + hello(result, baseProtocolVersion, clienId, gCapabilities, listeningPort, nodeId) + + var response = await result.nextMsg(p2p.hello, discardOthers = true) + result.dispatcher = getDispatcher(response.capabilities) + result.id = response.nodeId + result.connectionState = Connected + newSeq(result.protocolStates, gProtocols.len) + # XXX: initialize the sub-protocol states when isMainModule: import rlp - rlpxProtocol("test", 1): + rlpxProtocol aaa, 1: + type State = object + peerName: string + + proc hi(p: Peer, name: string) = + p.state.peerName = name + + rlpxProtocol bbb, 1: + type State = object + messages: int + proc foo(p: Peer, s: string, a, z: int) = - echo s + p.state.messages += 1 + echo p.state(aaa).peerName proc bar(p: Peer, i: int, s: string) diff --git a/ethp2p/rlpx_protocols/eth.nim b/ethp2p/rlpx_protocols/eth.nim index 30ebf37..fc5e243 100644 --- a/ethp2p/rlpx_protocols/eth.nim +++ b/ethp2p/rlpx_protocols/eth.nim @@ -12,7 +12,7 @@ type header: BlockHeader body {.rlpInline.}: BlockBody -rlpxProtocol("eth", 63): +rlpxProtocol eth, 63: proc status(p: Peer, protocolVersion, networkId, td: P, bestHash, genesisHash: KeccakHash) = discard diff --git a/ethp2p/rlpx_protocols/les.nim b/ethp2p/rlpx_protocols/les.nim index a128345..cde6bbb 100644 --- a/ethp2p/rlpx_protocols/les.nim +++ b/ethp2p/rlpx_protocols/les.nim @@ -30,7 +30,7 @@ type status*: TransactionStatus data*: Blob -rlpxProtocol("les", 2): +rlpxProtocol les, 2: ## Handshake ##