From 24d762da34176a17f66f8de732c8534ea9133e02 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Sun, 17 Jun 2018 13:21:06 +0300 Subject: [PATCH] Moved eth_p2p to asyncdispatch2. Fix some warnings at rlpx.nim. Commented debug echo in rlpx.nim. --- eth_p2p/auth.nim | 5 +- eth_p2p/discovery.nim | 67 ++++++-------- eth_p2p/ecies.nim | 5 +- eth_p2p/enode.nim | 3 +- eth_p2p/ethereum_types.nim | 3 +- eth_p2p/kademlia.nim | 17 ++-- eth_p2p/peer_pool.nim | 31 ++++--- eth_p2p/rlpx.nim | 181 +++++++++++++++++++------------------ eth_p2p/rlpxcrypt.nim | 5 +- eth_p2p/server.nim | 87 +++++++++--------- tests/tdiscovery.nim | 16 +++- tests/testauth.nim | 6 +- tests/testcrypt.nim | 7 +- tests/testecies.nim | 7 +- tests/testenode.nim | 4 +- tests/tserver.nim | 22 +++-- 16 files changed, 234 insertions(+), 232 deletions(-) diff --git a/eth_p2p/auth.nim b/eth_p2p/auth.nim index 4930432..853c2a6 100644 --- a/eth_p2p/auth.nim +++ b/eth_p2p/auth.nim @@ -11,9 +11,8 @@ ## This module implements Ethereum authentication import endians -import eth_keys, ecies, rlp -import nimcrypto/sysrand, nimcrypto/hash, nimcrypto/utils, nimcrypto/hmac -import nimcrypto/rijndael, nimcrypto/keccak, nimcrypto/sha2 +import eth_keys, nimcrypto, rlp +import ecies const SupportedRlpxVersion* = 4 diff --git a/eth_p2p/discovery.nim b/eth_p2p/discovery.nim index ca42e63..7bdb5bc 100644 --- a/eth_p2p/discovery.nim +++ b/eth_p2p/discovery.nim @@ -9,9 +9,9 @@ # from strutils import nil -import asyncnet, asyncdispatch, net, times, nativesockets, algorithm, logging +import times, algorithm, logging +import asyncdispatch2, eth_keys, ranges, stint, nimcrypto, rlp import kademlia, enode -import eth_keys, rlp, ranges, stint, nimcrypto export Node @@ -33,7 +33,6 @@ const "enode://6456719e7267e061161c88720287a77b80718d2a3a4ff5daeba614d029dc77601b75e32190aed1c9b0b9ccb6fac3bcf000f48e54079fa79e339c25d8e9724226@127.0.0.1:30301" ] - # UDP packet constants. MAC_SIZE = 256 div 8 # 32 SIG_SIZE = 520 div 8 # 65 @@ -48,7 +47,7 @@ type bootstrapNodes*: seq[Node] thisNode*: Node kademlia: KademliaProtocol[DiscoveryProtocol] - socket: AsyncSocket + transp: DatagramTransport CommandId = enum cmdPing = 1 @@ -98,20 +97,9 @@ proc expiration(): uint32 = # Wire protocol -proc sendTo*(socket: AsyncFD, data: seq[byte], ip: IpAddress, port: Port, - flags = {SocketFlag.SafeDisconn}) {.async.} = - var sa: Sockaddr_storage - var ln: Socklen - ip.toSockaddr(port, sa, ln) - try: - await sendTo(socket, unsafeAddr data[0], data.len, cast[ptr Sockaddr](addr sa), ln) - except: - error "sendTo failed: ", getCurrentExceptionMsg() - proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) = - asyncCheck d.socket.getFd().AsyncFD.sendTo(data, - n.node.address.ip, - n.node.address.udpPort) + let ta = initTAddress(n.node.address.ip, n.node.address.udpPort) + asyncCheck d.transp.sendTo(ta, data) proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] = let payload = rlp.encode((PROTO_VERSION, d.address, n.node.address, @@ -157,7 +145,9 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) = if nodes.len != 0: flush() -proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, bootstrapNodes: openarray[ENode]): DiscoveryProtocol = +proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, + bootstrapNodes: openarray[ENode] + ): DiscoveryProtocol = result.new() result.privKey = privKey result.address = address @@ -166,7 +156,8 @@ proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, bootstrapNodes result.thisNode = newNode(privKey.getPublicKey(), address) result.kademlia = newKademliaProtocol(result.thisNode, result) {.explain.} -proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) {.inline.} = +proc recvPing(d: DiscoveryProtocol, node: Node, + msgHash: MDigest[256]) {.inline.} = d.kademlia.recvPing(node, msgHash) proc recvPong(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} = @@ -174,7 +165,8 @@ proc recvPong(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} = let tok = rlp.listElem(1).toBytes().toSeq() d.kademlia.recvPong(node, tok) -proc recvNeighbours(d: DiscoveryProtocol, node: Node, payload: Bytes) {.inline.} = +proc recvNeighbours(d: DiscoveryProtocol, node: Node, + payload: Bytes) {.inline.} = let rlp = rlpFromBytes(payload.toRange) let neighboursList = rlp.listElem(0) let sz = neighboursList.listLen() @@ -245,28 +237,23 @@ proc receive(d: DiscoveryProtocol, a: Address, msg: Bytes) = else: error "Wrong msg mac from ", a -proc runListeningLoop(d: DiscoveryProtocol) {.async.} = - var buf = newSeq[byte](MaxDgramSize) - var saddr: Sockaddr_storage - var slen: Socklen - while not d.socket.isNil: - buf.setLen(MaxDgramSize) - slen = sizeof(saddr).Socklen - let received = await recvFromInto(d.socket.getFd().AsyncFD, addr buf[0], buf.len, cast[ptr SockAddr](addr saddr), addr slen) - buf.setLen(received) - try: - var port: Port - var ip: IpAddress - fromSockAddr(saddr, slen, ip, port) - d.receive(Address(ip: ip, udpPort: port, tcpPort: port), buf) - except: - error "receive failed: ", getCurrentExceptionMsg() +proc processClient(transp: DatagramTransport, + raddr: TransportAddress): Future[void] {.async, gcsafe.} = + var proto = getUserData[DiscoveryProtocol](transp) + var buf: seq[byte] + try: + # TODO: Maybe here better to use `peekMessage()` to avoid allocation, + # but `Bytes` object is just a simple seq[byte], and `ByteRange` object + # do not support custom length. + var buf = transp.getMessage() + let a = Address(ip: raddr.address, udpPort: raddr.port, tcpPort: raddr.port) + proto.receive(a, buf) + except: + error "receive failed: ", getCurrentExceptionMsg() proc open*(d: DiscoveryProtocol) = - d.socket = newAsyncSocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) - d.socket.bindAddr(port = d.address.udpPort) - - asyncCheck d.runListeningLoop() + let ta = initTAddress(d.address.ip, d.address.udpPort) + d.transp = newDatagramTransport(processClient, udata = d, local = ta) proc bootstrap*(d: DiscoveryProtocol) {.async.} = await d.kademlia.bootstrap(d.bootstrapNodes) diff --git a/eth_p2p/ecies.nim b/eth_p2p/ecies.nim index 6346ad2..c401ec8 100644 --- a/eth_p2p/ecies.nim +++ b/eth_p2p/ecies.nim @@ -10,10 +10,7 @@ ## This module implements ECIES method encryption/decryption. -import eth_keys -import nimcrypto/sha2, nimcrypto/hash, nimcrypto/hmac -import nimcrypto/rijndael, nimcrypto/utils, nimcrypto/sysrand -import nimcrypto/bcmode, nimcrypto/utils +import eth_keys, nimcrypto const emptyMac* = array[0, byte]([]) diff --git a/eth_p2p/enode.nim b/eth_p2p/enode.nim index 0085e70..a9af32e 100644 --- a/eth_p2p/enode.nim +++ b/eth_p2p/enode.nim @@ -8,7 +8,8 @@ # MIT license (LICENSE-MIT) # -import uri, eth_keys, strutils, net +import uri, strutils, net +import eth_keys type ENodeStatus* = enum diff --git a/eth_p2p/ethereum_types.nim b/eth_p2p/ethereum_types.nim index 055b8e2..1b5b234 100644 --- a/eth_p2p/ethereum_types.nim +++ b/eth_p2p/ethereum_types.nim @@ -8,8 +8,7 @@ # MIT license (LICENSE-MIT) # -import - rlp/types, nimcrypto/hash, stint +import rlp/types, nimcrypto/hash, stint export MDigest diff --git a/eth_p2p/kademlia.nim b/eth_p2p/kademlia.nim index 40793e3..c547075 100644 --- a/eth_p2p/kademlia.nim +++ b/eth_p2p/kademlia.nim @@ -8,14 +8,12 @@ # MIT license (LICENSE-MIT) # -import asyncdispatch, net, uri, logging, tables, hashes, times, algorithm, sets, - sequtils, random +import uri, logging, tables, hashes, times, algorithm, sets, sequtils, random from strutils import parseInt +import asyncdispatch2, eth_keys, stint, nimcrypto, enode export sets # TODO: This should not be needed, but compilation fails otherwise -import eth_keys, stint, nimcrypto, enode - type KademliaProtocol* [Wire] = ref object wire: Wire @@ -23,7 +21,7 @@ type routing: RoutingTable pongFutures: Table[seq[byte], Future[bool]] pingFutures: Table[Node, Future[bool]] - neighboursCallbacks: Table[Node, proc(n: seq[Node])] + neighboursCallbacks: Table[Node, proc(n: seq[Node]) {.gcsafe.}] NodeId* = UInt256 @@ -44,8 +42,8 @@ type const BUCKET_SIZE = 16 BITS_PER_HOP = 8 - REQUEST_TIMEOUT = 0.9 # timeout of message round trips - FIND_CONCURRENCY = 3 # parallel find node lookups + REQUEST_TIMEOUT = 900 # timeout of message round trips + FIND_CONCURRENCY = 3 # parallel find node lookups ID_SIZE = 256 proc toNodeId(pk: PublicKey): NodeId = @@ -246,7 +244,7 @@ proc updateRoutingTable(k: KademliaProtocol, n: Node) = asyncCheck k.bond(evictionCandidate) proc doSleep(p: proc()) {.async.} = - await sleepAsync(REQUEST_TIMEOUT * 1000) + await sleepAsync(REQUEST_TIMEOUT) p() template onTimeout(b: untyped) = @@ -266,7 +264,7 @@ proc waitPong(k: KademliaProtocol, n: Node, token: seq[byte]): Future[bool] = proc ping(k: KademliaProtocol, n: Node): seq[byte] = assert(n != k.thisNode) - k.wire.sendPing(n) + result = k.wire.sendPing(n) proc waitPing(k: KademliaProtocol, n: Node): Future[bool] = result = newFuture[bool]("waitPing") @@ -409,7 +407,6 @@ proc bootstrap*(k: KademliaProtocol, bootstrapNodes: seq[Node]) {.async.} = proc recvPong*(k: KademliaProtocol, n: Node, token: seq[byte]) = debug "<<< pong from ", n - let pingid = token & @(n.node.pubkey.data) var future: Future[bool] if k.pongFutures.take(pingid, future): diff --git a/eth_p2p/peer_pool.nim b/eth_p2p/peer_pool.nim index 9d2559a..7cf144b 100644 --- a/eth_p2p/peer_pool.nim +++ b/eth_p2p/peer_pool.nim @@ -8,8 +8,8 @@ # MIT license (LICENSE-MIT) # -import logging, tables, asyncdispatch, times, random -import eth_keys +import logging, tables, times, random +import eth_keys, asyncdispatch2 import discovery, rlpx, kademlia type @@ -32,9 +32,8 @@ const lookupInterval = 5 connectLoopSleepMs = 2000 - proc newPeerPool*(chainDb: AsyncChainDb, networkId: int, keyPair: KeyPair, - discovery: DiscoveryProtocol, minPeers = 10): PeerPool = + discovery: DiscoveryProtocol, minPeers = 10): PeerPool = result.new() result.keyPair = keyPair result.minPeers = minPeers @@ -74,7 +73,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} = debug "Skipping ", remote, "; already connected to it" return nil - result = await rlpxConnect(p.keyPair, p.listenPort, remote) + result = await remote.rlpxConnect(p.keyPair, p.listenPort) # expected_exceptions = ( # UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure) @@ -95,15 +94,16 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} = # return None proc lookupRandomNode(p: PeerPool) {.async.} = - # This method runs in the background, so we must catch OperationCancelled here otherwise - # asyncio will warn that its exception was never retrieved. + # This method runs in the background, so we must catch OperationCancelled + # ere otherwise asyncio will warn that its exception was never retrieved. try: discard await p.discovery.lookupRandom() except: # OperationCancelled discard p.lastLookupTime = epochTime() -proc getRandomBootnode(p: PeerPool): seq[Node] = @[p.discovery.bootstrapNodes.rand()] +proc getRandomBootnode(p: PeerPool): seq[Node] = + @[p.discovery.bootstrapNodes.rand()] proc peerFinished(p: PeerPool, peer: Peer) = ## Remove the given peer from our list of connected nodes. @@ -117,8 +117,8 @@ proc run(p: Peer, completionHandler: proc() = nil) {.async.} = proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = for node in nodes: - # TODO: Consider changing connect() to raise an exception instead of returning None, - # as discussed in + # TODO: Consider changing connect() to raise an exception instead of + # returning None, as discussed in # https://github.com/ethereum/py-evm/pull/139#discussion_r152067425 let peer = await p.connect(node) if not peer.isNil: @@ -143,8 +143,9 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} = await p.connectToNodes(p.nodesToConnect()) - # In some cases (e.g ROPSTEN or private testnets), the discovery table might be full of - # bad peers so if we can't connect to any peers we try a random bootstrap node as well. + # In some cases (e.g ROPSTEN or private testnets), the discovery table might + # be full of bad peers, so if we can't connect to any peers we try a random + # bootstrap node as well. if p.connectedNodes.len == 0: await p.connectToNodes(p.getRandomBootnode()) @@ -156,7 +157,8 @@ proc run(p: PeerPool) {.async.} = try: await p.maybeConnectToMorePeers() except: - # Most unexpected errors should be transient, so we log and restart from scratch. + # Most unexpected errors should be transient, so we log and restart from + # scratch. error "Unexpected error, restarting" dropConnections = true @@ -173,7 +175,8 @@ proc start*(p: PeerPool) = # @property # def peers(self) -> List[BasePeer]: # peers = list(self.connected_nodes.values()) -# # Shuffle the list of peers so that dumb callsites are less likely to send all requests to +# # Shuffle the list of peers so that dumb callsites are less likely to send +# # all requests to # # a single peer even if they always pick the first one from the list. # random.shuffle(peers) # return peers diff --git a/eth_p2p/rlpx.nim b/eth_p2p/rlpx.nim index f943cb7..9f64aad 100644 --- a/eth_p2p/rlpx.nim +++ b/eth_p2p/rlpx.nim @@ -8,10 +8,10 @@ # MIT license (LICENSE-MIT) # -import - macros, sets, algorithm, async, asyncnet, asyncfutures, net, logging, - hashes, rlp, ranges/[stackarrays, ptr_arith], eth_keys, - ethereum_types, kademlia, discovery, auth, rlpxcrypt, nimcrypto, enode +import macros, sets, algorithm, logging, hashes +import rlp, ranges/[stackarrays, ptr_arith], eth_keys, ethereum_types, + nimcrypto, asyncdispatch2 +import kademlia, discovery, auth, rlpxcrypt, enode type ConnectionState = enum @@ -21,7 +21,7 @@ type Disconnected Peer* = ref object - socket: AsyncSocket + transp: StreamTransport dispatcher: Dispatcher networkId: int secretsState: SecretState @@ -75,6 +75,7 @@ const baseProtocolVersion = 4 clienId = "Nimbus 0.1.0" +# TODO: Usage of this variables causes GCSAFE problems. var gProtocols: seq[ProtocolInfo] gCapabilities: seq[Capability] @@ -115,7 +116,7 @@ proc getDispatcher(otherPeerCapabilities: openarray[Capability]): Dispatcher = var nextUserMsgId = 0x10 - for i in 0 ..