From eb72c6042fc5fb23b05fdb6f566e326bc3d0ea00 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Mon, 6 Aug 2018 12:53:03 +0300 Subject: [PATCH] Alternative approach of Ethereum P2P networking. --- eth_p2p/eth.nim | 348 ++++++++++++++ eth_p2p/peer.nim | 1008 +++++++++++++++++++++++++++++++++++++++++ eth_p2p/protocols.nim | 314 +++++++++++++ tests/testpeer.nim | 149 ++++++ 4 files changed, 1819 insertions(+) create mode 100644 eth_p2p/eth.nim create mode 100644 eth_p2p/peer.nim create mode 100644 eth_p2p/protocols.nim create mode 100644 tests/testpeer.nim diff --git a/eth_p2p/eth.nim b/eth_p2p/eth.nim new file mode 100644 index 0000000..63a988b --- /dev/null +++ b/eth_p2p/eth.nim @@ -0,0 +1,348 @@ +# +# Ethereum P2P +# (c) Copyright 2018 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# + +import asyncdispatch2, stint, rlp, eth_common, chronicles +import protocols, peer + +const + # Max number of items we can ask for in ETH requests. These are the values + # used in geth and if we ask for more than this the peers will disconnect + # from us. + MaxStateFetch* = 384 + MaxBodiesFetch* = 128 + MaxReceiptsFetch* = 256 + MaxHeadersFetch* = 192 + +const + MsgStatus* = 0x00 + MsgNewBlockHashes* = 0x01 + MsgTransactions* = 0x02 + # eth/61 + MsgGetBlockHashes* = 0x03 + MsgBlockHashes* = 0x04 + MsgGetBlocks* = 0x05 + MsgBlocks* = 0x06 + MsgNewBlock* = 0x07 + MsgBlockHashesFromNumber* = 0x08 + # eth/62 + MsgGetBlockHeaders* = 0x03 + MsgBlockHeaders* = 0x04 + MsgGetBlockBodies* = 0x05 + MsgBlockBodies* = 0x06 + # eth/63 + MsgGetNodeData* = 0x0D + MsgNodeData* = 0x0E + MsgGetReceipts* = 0x0F + MsgReceipts* = 0x10 + + + +const + EthereumCap61* = initECap("eth", 61) + EthereumCap62* = initECap("eth", 62) + EthereumCap63* = initECap("eth", 63) + +proc ethGetCmd*(epcap: EPeerCap, cmd: int): int = + ## Checks if specific message is supported by capability/protocol ``epcap`` + ## and returns (zero based) specific to protocol message id. + ## If `cmd` identifier is not supported by specific protocol ``epcap`` -1 will + ## be returned. + result = -1 + let cmdId = epcap.protoId(cmd) + if epcap.cap == EthereumCap61: + if cmdId in {MsgStatus, MsgNewBlockHashes, MsgTransactions, + MsgGetBlockHashes, MsgBlockHashes, MsgGetBlocks, + MsgBlocks, MsgNewBlock, MsgBlockHashesFromNumber}: + result = cmdId + elif epcap.cap == EthereumCap62: + if cmdId in {MsgStatus, MsgNewBlockHashes, MsgTransactions, + MsgGetBlockHeaders, MsgBlockHeaders, MsgGetBlockBodies, + MsgBlockBodies, MsgNewBlock}: + result = cmdId + elif epcap.cap == EthereumCap63: + if cmdId in {MsgStatus, MsgNewBlockHashes, MsgTransactions, + MsgGetBlockHeaders, MsgBlockHeaders, MsgGetBlockBodies, + MsgBlockBodies, MsgNewBlock, MsgGetNodeData, MsgNodeData, + MsgGetReceipts, MsgReceipts}: + result = cmdId + else: + discard + +proc sendStatus*(peer: Peer, cap: EPeerCap, networkId: int, + tdifficulty: UInt256, bestHash: Hash256, + genesisHash: Hash256): Future[bool] {.async.} = + ## Send `Status` message to remote peer + if peer.state notin {ConnectionState.None, Connected}: return false + + let eindex = peer.supports([EthereumCap61, EthereumCap62, EthereumCap63]) + doAssert(eindex >= 0, "Peer do not support eth status()") + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgStatus) + writer.append(int(cmdId)) + writer.startList(5) + writer.append(int(cap.version)) + writer.append(int(networkId)) + writer.append(tdifficulty) + writer.append(bestHash) + writer.append(genesisHash) + debug "Sending Status message", peer = $peer, version = $cap.version + result = await peer.sendMessage(writer.finish()) + +proc sendNewBlockHashes*(peer: Peer, + hashes: seq[Hash256]): Future[bool] {.async.} = + if peer.state != Connected: return false + + let eindex = peer.supports(EthereumCap61) + doAssert(eindex >= 0, "Peer do not support NewBlockHashes()") + let cap = peer.caps[eindex] + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgNewBlockHashes) + writer.append(int(cmdId)) + writer.startList(len(hashes)) + for hash in hashes: + writer.append(hash) + debug "Sending NewBlockHashes message", peer = $peer, version = $cap.version + result = await peer.sendMessage(writer.finish()) + +proc sendNewBlockHashes*(peer: Peer, + bhashes: seq[tuple[hash: Hash256, num: UInt256]]): Future[bool] {.async.} = + if peer.state != Connected: return false + + let eindex = peer.supports(EthereumCap62) + doAssert(eindex >= 0, "Peer do not support NewBlockHashes()") + let cap = peer.caps[eindex] + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgNewBlockHashes) + writer.append(int(cmdId)) + writer.startList(len(bhashes)) + for item in bhashes: + writer.startList(2) + writer.append(item.hash) + writer.append(item.num) + debug "Sending NewBlockHashes message", peer = $peer, version = $cap.version + result = await peer.sendMessage(writer.finish()) + +proc sendBlockHashes*(peer: Peer, + hashes: seq[Hash256]): Future[bool] {.async.} = + if peer.state != Connected: return false + + let eindex = peer.supports(EthereumCap61) + doAssert(eindex >= 0, "Peer do not support NewBlockHashes()") + let cap = peer.caps[eindex] + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgBlockHashes) + writer.append(int(cmdId)) + writer.startList(len(hashes)) + for hash in hashes: + writer.append(hash) + debug "Sending BlockHashes message", peer = $peer, version = $cap.version + result = await peer.sendMessage(writer.finish()) + +# Missing 0x01:sendNewBlockHashes eth/61 +# Missing 0x02:sendTransactions eth/61 +# Missing 0x06:sendBlocks eth/61 +# Missing 0x07:newBlock eth/61 + +# Missing 0x04:sendBlockHeaders eth/62 +# Missing 0x06:sendBlockBodies eth/62 + +# Missing 0x0E:sendNodeDat eth/63 +# Missing 0x10:sendReceipts eth/63 + +template sendReceive(peer: Peer, epcap: EPeerCap, sendMsg: BytesRange, + msgId: int, name: string): EthereumMessage = + let startTime = fastEpochTime() + var msg = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + var sendfut = peer.sendMessage(sendMsg) + yield sendfut + let res = sendfut.read() + if res: + var fut = newFuture[EthereumMessage](name) + peer.subscribe(epcap, msgId, fut) + try: + var msgfut = wait(fut, peer.responseTimeout) + yield msgfut + msg = msgfut.read() + msg.elapsed = int(fastEpochTime() - startTime) + peer.unsubscribe(epcap, msgId) + except AsyncTimeoutError: + msg = EthereumMessage(id: MsgTimeout, data: zeroBytesRlp) + msg + +proc getBlockHashesFromNumber*(peer: Peer, number: UInt256, + maxBlocks: int): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap61) + doAssert(eindex >= 0, "Peer do not support getblockHashesFromNumber()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgBlockHashesFromNumber) + writer.append(int(cmdId)) + writer.startList(2) + writer.append(number) + writer.append(maxBlocks) + debug "Sending BlockHashesFromNumber message", peer = $peer, + version = $cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgBlockHashes, + "eth.getBlockHashesFromNumber") + +proc getBlockHashes*(peer: Peer, hash: Hash256, + maxBlocks: int): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap61) + doAssert(eindex >= 0, "Peer do not support getBlockHashes()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgGetBlockHashes) + writer.append(int(cmdId)) + writer.startList(2) + writer.append(hash) + writer.append(maxBlocks) + debug "Sending GetBlockHashes message", peer = $peer, version = $cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgBlockHashes, + "eth.getBlockHashes") + +proc getBlocks*(peer: Peer, + hashes: seq[Hash256]): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap61) + doAssert(eindex >= 0, "Peer do not support getBlocks()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgBlocks) + writer.append(int(cmdId)) + writer.startList(len(hashes)) + for item in hashes: + writer.append(item) + debug "Sending GetBlocks message", peer = $peer, version = $cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgBlocks, "eth.getBlocks") + +proc getBlockHeaders*(peer: Peer, blok: UInt256, maxHeaders: int, skip: int, + reverse: bool): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap62) + doAssert(eindex >= 0, "Peer do not support getBlockHeaders()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgGetBlockHeaders) + writer.append(int(cmdId)) + writer.startList(4) + writer.append(blok) + writer.append(maxHeaders) + writer.append(skip) + if reverse: + writer.append(int(1)) + else: + writer.append(int(0)) + debug "Sending GetBlockHeaders message", peer = $peer, version = $cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgBlockHeaders, + "eth.getBlockHeaders") + +proc getBlockHeaders*(peer: Peer, blok: Hash256, maxHeaders: int, skip: int, + reverse: bool): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap62) + doAssert(eindex >= 0, "Peer do not support getBlockHeaders()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgGetBlockHeaders) + writer.append(int(cmdId)) + writer.startList(4) + writer.append(blok) + writer.append(maxHeaders) + writer.append(skip) + if reverse: + writer.append(int(1)) + else: + writer.append(int(0)) + debug "Sending GetBlockHeaders message", peer = $peer, version = $cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgBlockHeaders, + "eth.getBlockHeaders") + +proc getBlockBodies*(peer: Peer, + hashes: seq[Hash256]): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap62) + doAssert(eindex >= 0, "Peer do not support getBlockBodies()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgGetBlockHeaders) + writer.append(int(cmdId)) + writer.startList(len(hashes)) + for item in hashes: + writer.append(item) + debug "Sending GetBlockBodies message", peer = $peer, version = $cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgBlockBodies, + "eth.getBlockBodies") + +proc getNodeData*(peer: Peer, + hashes: seq[Hash256]): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap63) + doAssert(eindex >= 0, "Peer do not support getNodeData()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgGetNodeData) + writer.append(int(cmdId)) + writer.startList(len(hashes)) + for item in hashes: + writer.append(item) + debug "Sending GetNodeData message", peer = $peer, version = $cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgNodeData, + "eth.getNodeData") + +proc getReceipts*(peer: Peer, + hashes: seq[Hash256]): Future[EthereumMessage] {.async.} = + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + if peer.state != Connected: return + + let eindex = peer.supports(EthereumCap63) + doAssert(eindex >= 0, "Peer do not support getReceipts()") + let cap = peer.caps[eindex] + + var writer = initRlpWriter() + let cmdId = cap.cmdId(MsgGetNodeData) + writer.append(int(cmdId)) + writer.startList(len(hashes)) + for item in hashes: + writer.append(item) + debug "Sending GetReceipts message", peer = $peer, + version = $cap.cap.version + + result = peer.sendReceive(cap, writer.finish(), MsgReceipts, + "eth.getReceipts") diff --git a/eth_p2p/peer.nim b/eth_p2p/peer.nim new file mode 100644 index 0000000..44cdb1f --- /dev/null +++ b/eth_p2p/peer.nim @@ -0,0 +1,1008 @@ +# +# Ethereum P2P +# (c) Copyright 2018 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# + +## This is alternative approach for Ethereum Networking. +## +## Approach is based on extending ``Peer`` functionality depending on protocols +## it supports. With such approach you can easily support LES and ETH protocols +## in one peer instance. +## +## Two mechanisms are introduced to handle incoming Ethereum frames, one is +## called ``subscribers`` and can be used via procedures +## ``subscribe/unsubscribe`` to watch for specific Ethereum message, and +## another is ``closure interfaces``, which can be used to implement logic code +## to process incoming messages. Closure interfaces are registered with global +## ``EthereumNode`` object and will be instantiated for every peer. +## +## Peer is taking care of disconnects, and can notify ``PeerPool`` about its +## death via `liveFuture`. So now you don't need to sleep in cycle in +## ``PeerPool`` code. ``PeerPool`` just needs to wait for peer's futures and +## it will know when one of the peers got disconnected and need to be replaced. +## +## All networking procedures are is taking care of timeouts: +## 1) Connection timeout +## 2) Authentication timeout +## 3) Handshake timeout +## 4) Request->Response timeout +## +## Introduced ``PeerMetrics`` object which will gather information about number +## of bytes received from specific peer. With information from ``PeerMetrics`` +## you can make some kind of QOS to disconnect useless or slow peers. +## +## Every ``request->response`` message also store time of operation inside of +## message type ``EthereumMessage``, you can obtain it via ``elapsed`` field. +## +## **Subscribers**. +## +## Using subscribers you can register listener for any message in stream, but +## its more suitable to handle `request->response` pairs messages such as +## GetBlockHashes/BlockHashes, GetBlocks/Blocks, GetBlockHeaders/BlockHeaders, +## GetNodeData/NodeData, GetReceipts/Receipts. So if specification requires to +## handle `request->response` sequence, its easier to bundle both operations +## in one procedure, which first sends `request` and then subscribes a Future +## to response message id. If remote peer disconnected or sent +## malformed/incorrect message all subscribers will be notified with `MsgBad` +## empty message. +## +## You can see examples of `subscribers` usage in `eth.nim` code +## +## **Closure interfaces**. +## +## Interfaces is one more way to handle messages stream, you can use interfaces +## to handle all messages from peer's stream, but its more suitable to handle +## `announce` messages (like Transactions or NewBlock) and to handle requests +## from remote peer (GetBlockHashes, GetBlocks, GetBlockHeaders, GetBlockBodies, +## GetNodeData, GetReceipts). +## If remote peer disconnected or sent malformed/incorrect message interface +## will be notified with `MsgBad` empty message. +## In interface's `run` function you are running loop: +## +## .. code-block::nim +## proc run(peer: Peer) {.async.} = +## while true: +## # Waiting for message from remote `peer` and for protocol `epcap`. +## var msg = await peer.getMessage(epcap) +## if msg.id == MsgBad: +## # Remote peer sent malformed message or get disconnected without +## # reason. +## break +## elif msg.id == MsgDisconnect: +## # Remote peer sent `Disconnect` message with a reason. +## break +## else: +## # Here we can get any message specific exactly to this protocol. +## # ethGetCmd() is procedure which performs check of message id +## # according to protocol `epcap`. And if passed message is passed +## # all checks, then zero based protocol message id will be returned +## # (which will be equal to message id in specification). +## let ethid = epcap.ethGetCmd(msg.id) +## if ethid == -1: +## # Received message with id, which is not related to protocol +## await peer.disconnect(BreachOfProtocol) +## break +## else: +## if ethid == MsgGetBlockHeaders: +## # Received GetBlockHeaders +## discard +## elif ethid == MsgGetBlockBodies: +## # Received GetBlockBodies. +## discard +## elif ethid == MsgGetNodeData: +## # Received GetNodeData. +## discard +## elif ethid == MsgGetReceipts: +## # received GetReceipts. +## discard +## +## Every incoming message from remote peer will be delivered first to +## subscribers, and if there no subscribers for specific message id, +## it will be added to interface's message queue, so protocol interface can +## obtain it via ``getMessage()`` call. +## +## You can see examples of closure interfaces usage in `tests/testpeer.nim`. + +import asyncdispatch2, eth_keys, ranges, rlp, nimcrypto, stint, eth_common +import protocols, kademlia, rlpxcrypt, auth, enode, chronicles + +const + PeerRecvBufferInitialSize* = 1024 ## Initial receiving buffer size for Peer. + PeerSendBufferInitialSize* = 1024 ## Initial sending buffer size for Peer. + ConnectTimeout* = 10000 ## Connection timeout in milliseconds. + AuthenticationTimeout* = 10000 ## Authentication timeout in milliseconds. + HandshakeTimeout* = 10000 ## devP2P handshake timeout in milliseconds. + ResponseTimeout* = 60000 ## Initial peer's response waiting timeout + MaxUInt24 = (not uint32(0)) shl 8 ## Maximum size of Ethereum devP2P frame. + +const + MsgTimeout* = -2 + MsgBad* = -1 + MsgHello* = 0 + MsgDisconnect* = 1 + MsgPing* = 2 + MsgPong* = 3 + +type + PeerMetrics* = object + ## Peer metrics used to calculate peer's performance (can be used in + ## PeerPool to not use slow peers while sync) + startTime*: uint64 ## Time in milliseconds when connection happens + bytesCount*: uint64 ## Number of bytes received from peer + + ConnectionState* = enum + ## Peer's connection state + None, ## Authenticating/Handshaking. + Connected, ## Connection established. + Disconnecting, ## `Disconnect` message was sent. + Disconnected ## Connection dropped. + + EthereumMessage* = object + ## Ethereum devP2P message + id*: int ## Message ID. + elapsed*: int ## Time in milliseconds of request-response operation. + data*: Rlp ## Message RLP frame + + PeerFlags* = enum + ## Peer's flags + Incoming, ## Peer was accepted. + Outgoing ## Peer was connected. + + DisconnectReason* = enum + ## Ethereum devP2P disconnect reasons. + DisconnectRequested, + TcpError, + BreachOfProtocol, + UselessPeer, + TooManyPeers, + AlreadyConnected, + IncompatibleProtocolVersion, + NullNodeIdentityReceived, + ClientQuitting, + UnexpectedIdentity, + SelfConnection, + MessageTimeout, + UnknownError12, + UnknownError13, + UnknownError14, + UnknownError15, + SubprotocolReason, + UnknownError + + Peer* = ref object + ## Peer's object + transp*: StreamTransport ## peer's underlying transport + node*: Node ## peer's p2p address + sbuffer*: seq[byte] ## sending buffer + rbuffer*: seq[byte] ## receiving buffer + queues*: seq[AsyncQueue[EthereumMessage]] ## sub-protocol message queues + ifaces: seq[EInterface] ## sub-protocol interfaces + secrets*: SecretState ## peer's cryptographic secrets + state*: ConnectionState ## current state + flags*: set[PeerFlags] ## peer's flags + version*: int ## peer's devP2P version + clientId*: string ## peer's client identifier + remotePort*: Port ## peer's TCP port to conect + subscribers: array[256, Future[EthereumMessage]] ## peer's subscribers array + allcaps*: ECapList ## peer's all capabilities list + caps*: EPeerCapList ## peer's synchronized caps list + liveFuture*: Future[void] ## peer's live future + responseTimeout*: int ## peer's initial response timeout + metrics*: PeerMetrics ## peer's metrics + + EthereumNode* = ref object + netver*: int ## devP2P version + clientId*: string ## client identifier + port*: Port ## TCP port to connect + caps*: ECapList ## available capabilities + protocols*: seq[EProtocol] ## available interfaces + keys*: KeyPair ## security keys + network*: int ## network id + + PeerException* = object of Exception + PeerAddressException* = object of Exception + PeerWriteIncomplete = object of PeerException + + EInterface* = ref object of RootRef + ## Sub-protocol interface object definition + handshake*: proc(peer: Peer): Future[bool] + ## Callback which will be called to perform sub-protocol handshake. + ## Callback must return ``true`` to signal that handshake was completed + ## successfully or ``false`` on error. + run*: proc(peer: Peer): Future[void] + ## Callback which will be called to run message loop. + liveFuture*: Future[void] + ## Interface live future, which will be completed when ``run`` will be + ## finished. + + EInterfaceProc* = proc(en: EthereumNode, peer: Peer, + proto: EPeerCap): EInterface + ## Protocol interface initialization function + + EProtocol = object + cap: ECap + init: EInterfaceProc + +proc checkIncomplete(w, e: int) = + if w != e: + raise newException(PeerWriteIncomplete, "Write operation incomplete!") + +proc `$`*(peer: Peer): string = + ## Return string representation of peer ``peer``. + result = $peer.node + +proc getReason*(reason: int): DisconnectReason = + ## Convert integer reason code to ``DisconnectReason`` enum. + if reason < int(low(DisconnectReason)) or + reason > int(high(DisconnectReason)): + result = UnknownError + else: + result = DisconnectReason(reason) + +proc toIndex*(peer: Peer, cmdid: int): int = + ## Get index in peer's protocols sequence for message with id ``cmdid``. + result = -1 + for i in 0..= proto.offset and cmdid < proto.offset + protoLength(proto.cap): + result = i + break + +proc sendMessage*(peer: Peer, data: BytesRange): Future[bool] {.async.} = + ## Sends RLP encoded message ``data`` to peer ``peer``. Returns ``true`` if + ## message was successfully sent, and ``false`` otherwise. + var header: RlpxHeader + result = true + if uint32(len(data)) <= MaxUInt24: + # write the frame size in the first 3 bytes of the header + let length = len(data) + header[0] = byte((length shr 16) and 0xFF) + header[1] = byte((length shr 8) and 0xFF) + header[2] = byte(length and 0xFF) + peer.sbuffer.setLen(encryptedLength(length)) + let res = encrypt(peer.secrets, header, data.toOpenArray, peer.sbuffer) + if res != RlpxStatus.Success: + debug "Failed to encrypt message", peer = $peer, error = $res, + size = $len(data) + result = false + else: + debug "RLPx message size exceeds limit", peer = $peer, size = $len(data) + result = false + + if result: + try: + let cnt = await peer.transp.write(peer.sbuffer) + if cnt != len(peer.sbuffer): + result = false + except: + debug "Failed to send message", peer = $peer, size = $len(data) + result = false + +proc recvMessage*(peer: Peer): Future[EthereumMessage] {.async.} = + ## Receive first incoming Ethereum frame message from networking stream of + ## peer's `peer`. + ## Returns message with id ``MsgBad``, if there problems with connection or + ## decrypting of incoming message. + var + header: array[32, byte] + success = true + + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + + try: + await peer.transp.readExactly(addr header[0], 32) + except TransportIncompleteError: + debug "Remote peer disconnected", peer = $peer + peer.state = Disconnected + success = false + except TransportOsError: + debug "Networking error", peer = $peer, msg = getCurrentExceptionMsg() + peer.state = Disconnected + success = false + + if not success: return + + var msgSize: int + if decryptHeaderAndGetMsgSize(peer.secrets, header, + msgSize) != RlpxStatus.Success: + return + + peer.rbuffer.setLen(encryptedLength(msgSize) - RlpHeaderLength - RlpMacLength) + + try: + await peer.transp.readExactly(addr peer.rbuffer[0], len(peer.rbuffer)) + except TransportIncompleteError: + debug "Remote peer disconnected", peer = $peer + peer.state = Disconnected + success = false + except TransportOsError: + debug "Networking error", peer = $peer, msg = getCurrentExceptionMsg() + peer.state = Disconnected + success = false + + if not success: return + + let decryptedMaxLength = decryptedLength(msgSize) + var decryptedBytes = newSeq[byte](decryptedMaxLength) + var decryptedLength = 0 + + if decryptBody(peer.secrets, peer.rbuffer, msgSize, + decryptedBytes, decryptedLength) != RlpxStatus.Success: + return + + decryptedBytes.setLen(decryptedLength) + try: + var data = rlpFromBytes(decryptedBytes.toRange()) + result.id = data.read(int) + result.data = data + peer.metrics.bytesCount += uint64(decryptedLength) + except: + debug "Could not decode RLP message", peer = $peer, + msg = getCurrentExceptionMsg() + result = EthereumMessage(id: MsgBad, data: zeroBytesRlp) + + return + +proc supports*(peer: Peer, cap: ECap): int = + ## Returns index of capability in peer's list of synchronized capabilities. + ## If capability is not supported ``-1`` will be returned. + result = -1 + for i in 0..= 3) + assert(version >= 0 and version <= 255) + result = ECap((int(name[0]) shl 24) or (int(name[1]) shl 16) or + (int(name[2]) shl 8) or (int(version) and 0xFF)) + +proc initECap*(rcap: RlpCap): ECap = + ## Create new Ethereum Capability using RLP serialized capability. + assert(rcap.version >= 0 and rcap.version <= 255) + result = ECap((int(rcap.name[0]) shl 24) or (int(rcap.name[1]) shl 16) or + (int(rcap.name[2]) shl 8) or (int(rcap.version) and 0xFF)) + +proc initRCap*(ecap: ECap): RlpCap = + ## Create RLP serialized Ethereum Capability from internal representation of + ## Ethereum Capability ``ecap``. + result.name[0] = chr((int(ecap) shr 24) and 0xFF) + result.name[1] = chr((int(ecap) shr 16) and 0xFF) + result.name[2] = chr((int(ecap) shr 8) and 0xFF) + result.version = int(ecap) and 0xFF + +proc hash*(ecap: ECap): Hash {.inline.} = + ## Calculate ``Hash`` for Ethereum Capability ``ecap``. + result = Hash(ecap) + +proc `$`*(ecap: ECap): string = + ## Get string representation of Ethereum Capability ``ecap``. + result = newStringOfCap(8) + result.setLen(4) + result[3] = '/' + result[2] = chr((int(ecap) shr 8) and 0xFF) + result[1] = chr((int(ecap) shr 16) and 0xFF) + result[0] = chr((int(ecap) shr 24) and 0xFF) + result.add($(int(ecap) and 0xFF)) + +proc `$`*(rcap: RlpCap): string = + ## Get string representation of RLP serialized Ethereum Capability ``rcap``. + result = newStringOfCap(8) + result.setLen(4) + result[3] = '/' + result[2] = rcap.name[2] + result[1] = rcap.name[1] + result[0] = rcap.name[0] + result.add($rcap.version) + +proc cmpProto*(ecap1, ecap2: ECap): int = + ## Compare protocols of Ethereum Capabilities ``ecap1`` and ``ecap2``. + result = ((int(ecap1) shr 8) and 0xFFFFFF) - + ((int(ecap2) shr 8) and 0xFFFFFF) + +proc cmpVersion*(ecap1, ecap2: ECap): int = + ## Compare versions of Ethereum Capabilities ``ecap1`` and ``ecap2``. + result = (int(ecap1) and 0xFF) - (int(ecap2) and 0xFF) + +proc version*(ecap: ECap): int = + ## Get version of Ethereum Capability ``ecap`` as integer. + result = (int(ecap) and 0xFF) + +proc protocol*(ecap: ECap): string = + ## Get protocol of Ethereum Capability ``ecap`` as string. + result = newString(3) + result[0] = chr((int(ecap) shr 24) and 0xFF) + result[1] = chr((int(ecap) shr 16) and 0xFF) + result[2] = chr((int(ecap) shr 8) and 0xFF) + +proc protocol*(epcap: EPeerCap): string {.inline.} = + ## Get protocol of Ethereum Capability ``epcap`` as string. + result = epcap.cap.protocol() + +proc version*(epcap: EPeerCap): int {.inline.} = + ## Get version of Ethereum Capability ``ecap`` as integer. + result = epcap.cap.version() + +proc `==`*(x: ECap, y: ECap): bool {.borrow.} + ## Compare Ethereum Capabilities ``x`` and ``y``. + +type + EProtocol* = object + cap*: ECap + length*: int + +const + EthereumProtocols* = [ + EProtocol(cap: initECap("eth", 61), length: 9), + EProtocol(cap: initECap("eth", 62), length: 8), + EProtocol(cap: initECap("eth", 63), length: 16), + EProtocol(cap: initECap("les", 1), length: 15), + EProtocol(cap: initECap("les", 2), length: 21) + ] + +proc protoLength*(cap: ECap): int = + ## Get number of commands for Ethereum Capability ``cap``. + for item in EthereumProtocols: + if item.cap == cap: + result = item.length + break + +proc cmp*(x, y: ECap): int = + ## Comparison function for sorting Ethereum Capabilities. + if x == y: return 0 + if int(x) < int(y): return -1 + return 1 + +proc sync*(secap, ecap: ECapList): EPeerCapList = + ## Synchronize local and remote lists of Ethereum Capabilities, and calculate + ## protocol commands' offsets. + ## + ## Please note, that ``secap`` list must be sorted! + result = newSeq[EPeerCap]() + var curindex = 0 + for cap1 in secap: + for cap2 in ecap: + if cap1 == cap2: + if len(result) == 0: + # first added protocol has 0x10 offset + result.add(EPeerCap(cap: cap1, offset: 16, index: curindex)) + inc(curindex) + else: + let prev = result[^1] + if cmpProto(prev.cap, cap1) == 0: + if cmpVersion(prev.cap, cap1) < 0: + # replacing same protocol with most recent version + result[^1] = EPeerCap(cap: cap1, offset: prev.offset) + else: + # adding new protocol with offset + let offset = prev.offset + protoLength(prev.cap) + result.add(EPeerCap(cap: cap1, offset: offset, index: curindex)) + inc(curindex) + +proc register*(lcap: var ECapList, cap: ECap) = + ## Registers Ethereum Capability ``cap`` in list ``lcap``. + ## + ## Procedure keeps list ``lcap`` sorted. + if len(lcap) == 0: + lcap.add(cap) + else: + for item in lcap: + if item == cap: + return + lcap.add(cap) + sort(lcap, cmp) + +proc register*(lcap: var ECapList, caps: openarray[ECap]) = + ## Registers array of Ethereum Capabilities ``caps`` in list ``lcap``. + ## + ## Procedure keeps ``lcap`` sorted. + for item in caps: + lcap.register(item) + +proc unregister*(lcap: var ECapList, cap: ECap) = + ## Unregister Ethereum Capability ``cap`` from list ``lcap``. + ## + ## Procedure keeps list ``lcap`` sorted. + var scap: seq[ECap] + for item in lcap: + if item != cap: + scap.add(item) + if len(scap) != len(lcap): + shallowCopy(lcap, scap) + +proc unregister*(lcap: var ECapList, caps: openarray[ECap]) = + ## Unregister array of Ethereum Capabilities from list ``lcap``. + ## + ## Procedure keeps list ``lcap`` sorted. + for item in caps: + lcap.unregister(item) + +proc newECapList*(caps: openarray[ECap]): ECapList = + ## Create new Ethereum Capabilities list and populate it with capabilities + ## from ``caps``. + result = newSeq[ECap]() + result.register(caps) + +proc newECapList*(cap: ECap): ECapList = + ## Create new Ethereum Capabilities list and register capability ``cap`` + ## in it. + result = newSeq[ECap]() + result.register(cap) + +proc `$`*(lcap: ECapList): string = + ## Get string representation of Ethereum Capabilities list ``lcap``. + result = "" + for item in lcap: + if len(result) > 0: + result.add(", ") + result.add($item) + else: + result.add($item) + +proc `$`*(pcap: EPeerCapList): string = + ## Get string representation of synchronized remote peer capabilities + ## ``pcap``. + result = "" + for item in pcap: + if len(result) > 0: + result.add(", ") + result.add($item.cap) + result.add(" (") + result.add($item.offset) + result.add("/") + result.add($item.index) + result.add(")") + else: + result.add($item.cap) + result.add(" (") + result.add($item.offset) + result.add(" / ") + result.add($item.index) + result.add(")") + +proc `$`*(rcap: RlpCapList): string = + ## Get string representation of RLP serialized Ethereum Capabilities list. + result = "" + for item in rcap: + if len(result) > 0: + result.add(", ") + result.add($item) + else: + result.add($item) + +proc cmdId*(epcap: EPeerCap, cmdid: int): int {.inline.} = + ## Get actual command id of command with ``cmdid`` using data from + ## synchronized peer capability ``epcap``. + result = cmdid + epcap.offset + +proc cmdId*(pcap: EPeerCapList, proto: string, cmdid: int): int = + ## Get actual command id from protocol with name ``proto`` and list of + ## synchronized peer capabilities ``pcap``. + result = cmdid + var cap = initECap(proto, 0) + for item in pcap: + if cmpProto(item.cap, cap) == 0: + result += item.offset + break + +proc protoId*(epcap: EPeerCap, cmdid: int): int {.inline.} = + ## Get sub-protocol specific ``message id`` (zero based) from peer's cmd id. + result = cmdid - epcap.offset + +proc fromRlp*(lrlpcap: openarray[RlpCap]): ECapList = + ## Convert list of RLP serialized Ethereum Capabilities to list of Ethereum + ## Capabilities. + result = newECapList() + for item in lrlpcap: + result.register(initECap(item)) + +proc toRlp*(lcap: openarray[ECap]): RlpCapList = + ## Convert list of Ethereum Capabilities ``lcap`` to list of RLP serialized + ## Ethereum Capabilities. + result = newSeq[RlpCap]() + for item in lcap: + result.add(initRCap(item)) + +when isMainModule: + var lcaplist = newECapList() + var rcaplist = newECapList() + + var a = initECap("eth", 61) + var b = initECap("eth", 62) + var c = initECap("eth", 63) + var d = initECap("les", 1) + var e = initECap("les", 2) + var f = initECap("par", 1) + var g = initECap("par", 2) + + lcaplist.register([a, b, c, d]) + rcaplist.register([e, d, d, c, b, a, f, g]) + + echo "local ", lcaplist + echo "remote ", rcaplist + + echo sync(lcaplist, rcaplist) diff --git a/tests/testpeer.nim b/tests/testpeer.nim new file mode 100644 index 0000000..0e52013 --- /dev/null +++ b/tests/testpeer.nim @@ -0,0 +1,149 @@ +import asyncdispatch2, eth_keys, eth_common, rlp, chronicles, nimcrypto, stint +import peer, protocols, eth, enode + +const + NodeKey = "1b5b2a9c891067139c2aac53f66a84e2888ce494407a21c662dc546150e7e170" + # enode://425f2261ef52010ed833bdbebbc67c36dfc208c0330e2c248fadef3feeb291c677265289bf64437055116b7d1dc3f78be5122d0041f021c01a501b876c664d4f@[::]:30303 + ENodeAddress = "enode://410a034fbd91e872cbe52a5fb5bec1f030d4239dab35efbdf53b6a3f09f42a84965f0d5c76c44d42b791339ff249675ec7fc656d69d10cb95b315a1136f6634e@192.168.2.10:30303" + GenesisHash = "D4E56740F876AEF8C010B86A40D5F56745A118D0906A34E69AEC8C0DB1CB8FA3" + +proc getInterface(en: EthereumNode, peer: Peer, epcap: EPeerCap): EInterface = + var + version: int + network: int + totalDifficulty: UInt256 + bestHash: Hash256 + genesisHash: Hash256 + + var currentTotalDifficulty = 0.u256 + var currentBestHash: Hash256 + var currentGenesisHash: Hash256 + + var hash = fromHex(GenesisHash) + copyMem(addr currentGenesisHash, addr hash[0], 32) + + proc handshake(peer: Peer): Future[bool] {.async.} = + # Sending `Status` message to remote peer. + let res = await peer.sendStatus(epcap, en.network, currentTotalDifficulty, + currentBestHash, currentGenesisHash) + if not res: return false + + # Waiting for `Status` message from remote peer. + var msg = await peer.getMessage(epcap) + + # Converting synchronized command id back to protocol message id. + var ethid = epcap.ethGetCmd(msg.id) + + if msg.id == MsgBad or msg.id == MsgDisconnect: + ## Received message is rather incorrect or disconnect. + result = false + else: + if ethid == MsgStatus: + result = true + # Decoding `Status` message frame. + if (not msg.data.isList()) or (msg.data.listLen() != 5): + debug "Malformed status message received", peer = $peer, + isList = msg.data.isList(), + listLength = msg.data.listLen() + result = false + + if not result: return + + try: + msg.data.enterList() + version = msg.data.read(int) + network = msg.data.read(int) + totalDifficulty = msg.data.read(UInt256) + bestHash = msg.data.read(Hash256) + genesisHash = msg.data.read(Hash256) + except: + debug "Malformed status message received", peer = $peer + result = false + + if not result: return + + # Verification of received data from `Status` message frame. + if version != epcap.version(): + debug "Sub-protocol version did not match", peer = $peer, + remoteVersion = $version, + localVersion = $epcap.version + await peer.disconnect(UselessPeer) + return false + + # Remote network id must be equal to our network id + if network != en.network: + debug "Different network id specified", peer = $peer, + remoteNetwork = network, + localNetwork = en.network + await peer.disconnect(UselessPeer) + return false + + # Remote Genesis must be equal to our Genesis + if genesisHash != currentGenesisHash: + debug "Genesis hash did not match", peer = $peer, + remoteGenesis = $genesisHash, + localGenesis = $currentGenesisHash + await peer.disconnect(UselessPeer) + return false + + debug "Ethereum Protocol started", peer = $peer, + version = $epcap.version + result = true + else: + # There must be no other messages, except `Status` message. + debug "Incorrect message received", peer = $peer, msgId = $msg.id + result = false + + proc run(peer: Peer) {.async.} = + while true: + var msg = await peer.getMessage(epcap) + if msg.id == MsgBad: + # Remote peer sent malformed message or get disconnected without reason. + # You don't need to close peer here, this is just notification so you + # can break your cycle. + debug "Sub-protocol received notification", peer = $peer + break + elif msg.id == MsgDisconnect: + # Remote peer send `Disconnect` message with a reason. + # You don't need to close peer here, this is just notification so you + # can break your cycle. + debug "Sub-protocol received disconnect notification", peer = $peer + break + else: + # Here we can get any message specific exactly to this protocol. + let ethid = epcap.ethGetCmd(msg.id) + if ethid == -1: + # Received message with id, which is not related to protocol + debug "Sub-protocol received incorrect message", peer = $peer, + msgid = $msg.id + # peer.disconnect() will do `peer.close()` for us + await peer.disconnect(BreachOfProtocol) + break + else: + debug "Sub-protocol received message", peer = $peer, + msgid = $msg.id, + ethid = $ethid + if ethid == MsgGetBlockHeaders: + debug "Received MsgGetBlockHeaders", peer = $peer + elif ethid == MsgGetBlockBodies: + debug "Received MsgGetBlockBodies", peer = $peer + elif ethid == MsgGetNodeData: + debug "Received MsgGetNodeData", peer = $peer + elif ethid == MsgGetReceipts: + debug "Received MsgGetReceipts", peer = $peer + + new result + result.handshake = handshake + result.run = run + +proc test() {.async.} = + var en = newEthereumNode(1, initPrivateKey(NodeKey)) + en.registerProtocol(initECap("eth", 63), getInterface) + let peer = await connect(en, ENodeAddress) + await sleepAsync(1000) + var time = await peer.ping() + echo "PONG RECEIVED in ", time, "ms" + await sleepAsync(1000000) + +when isMainModule: + waitFor test()