From e62fdfe6f1c8cbef48768a4d12fb2929518649d8 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Tue, 15 Mar 2022 18:08:15 +0100 Subject: [PATCH] Add bind ip and bind port for eth1 rlpx and discovery protocols (#485) Includes some small refactor on usage of result and Nim style according to style guide. --- eth/p2p.nim | 53 +++++++++++++++++--------------- eth/p2p/discovery.nim | 44 ++++++++++++++++---------- eth/p2p/peer_pool.nim | 10 +++--- eth/p2p/private/p2p_types.nim | 3 +- tests/fuzzing/discovery/fuzz.nim | 4 ++- tests/p2p/p2p_test_helper.nim | 11 +++++-- tests/p2p/test_discovery.nim | 9 +++--- 7 files changed, 80 insertions(+), 54 deletions(-) diff --git a/eth/p2p.nim b/eth/p2p.nim index a945451..ea2ea4d 100644 --- a/eth/p2p.nim +++ b/eth/p2p.nim @@ -29,15 +29,20 @@ proc addCapability*(node: var EthereumNode, p: ProtocolInfo) = template addCapability*(node: var EthereumNode, Protocol: type) = addCapability(node, Protocol.protocolInfo) -proc newEthereumNode*(keys: KeyPair, - address: Address, - networkId: NetworkId, - chain: AbstractChainDB, - clientId = "nim-eth-p2p/0.2.0", # TODO: read this value from nimble somehow - addAllCapabilities = true, - useCompression: bool = false, - minPeers = 10, - rng = newRng()): EthereumNode = +proc newEthereumNode*( + keys: KeyPair, + address: Address, + networkId: NetworkId, + chain: AbstractChainDB, + clientId = "nim-eth-p2p/0.2.0", # TODO: read this value from nimble somehow + addAllCapabilities = true, + useCompression: bool = false, + minPeers = 10, + bootstrapNodes: seq[ENode] = @[], + bindUdpPort: Port, + bindTcpPort: Port, + bindIp = IPv4_any(), + rng = newRng()): EthereumNode = if rng == nil: # newRng could fail raise (ref Defect)(msg: "Cannot initialize RNG") @@ -50,6 +55,12 @@ proc newEthereumNode*(keys: KeyPair, result.capabilities.newSeq 0 result.address = address result.connectionState = ConnectionState.None + result.bindIp = bindIp + result.bindPort = bindTcpPort + + result.discovery = newDiscoveryProtocol( + keys.seckey, address, bootstrapNodes, bindUdpPort, bindIp, rng) + result.rng = rng when useSnappy: @@ -58,10 +69,10 @@ proc newEthereumNode*(keys: KeyPair, result.protocolStates.newSeq allProtocols.len - result.peerPool = newPeerPool(result, networkId, - keys, nil, - clientId, address.tcpPort, - minPeers = minPeers) + result.peerPool = newPeerPool( + result, networkId, keys, nil, clientId, minPeers = minPeers) + + result.peerPool.discovery = result.discovery if addAllCapabilities: for p in allProtocols: @@ -81,8 +92,8 @@ proc listeningAddress*(node: EthereumNode): ENode = node.toENode() proc startListening*(node: EthereumNode) {.raises: [CatchableError, Defect].} = - # TODO allow binding to specific IP / IPv6 / etc - let ta = initTAddress(IPv4_any(), node.address.tcpPort) + # TODO: allow binding to both IPv4 & IPv6 + let ta = initTAddress(node.bindIp, node.bindPort) if node.listeningServer == nil: node.listeningServer = createStreamServer(ta, processIncoming, {ReuseAddr}, @@ -90,18 +101,12 @@ proc startListening*(node: EthereumNode) {.raises: [CatchableError, Defect].} = node.listeningServer.start() info "RLPx listener up", self = node.listeningAddress -proc connectToNetwork*(node: EthereumNode, - bootstrapNodes: seq[ENode], - startListening = true, - enableDiscovery = true, - waitForPeers = true) {.async.} = +proc connectToNetwork*( + node: EthereumNode, startListening = true, + enableDiscovery = true, waitForPeers = true) {.async.} = doAssert node.connectionState == ConnectionState.None node.connectionState = Connecting - node.discovery = newDiscoveryProtocol(node.keys.seckey, - node.address, - bootstrapNodes) - node.peerPool.discovery = node.discovery if startListening: p2p.startListening(node) diff --git a/eth/p2p/discovery.nim b/eth/p2p/discovery.nim index ede6323..52f3607 100644 --- a/eth/p2p/discovery.nim +++ b/eth/p2p/discovery.nim @@ -33,9 +33,11 @@ type privKey: PrivateKey address: Address bootstrapNodes*: seq[Node] - thisNode*: Node + localNode*: Node kademlia*: KademliaProtocol[DiscoveryProtocol] transp: DatagramTransport + bindIp: IpAddress + bindPort: Port CommandId = enum cmdPing = 1 @@ -154,16 +156,26 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) = if nodes.len != 0: flush() -proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, - bootstrapNodes: openArray[ENode], rng = newRng() - ): DiscoveryProtocol = - result.new() - result.privKey = privKey - result.address = address - result.bootstrapNodes = newSeqOfCap[Node](bootstrapNodes.len) - for n in bootstrapNodes: result.bootstrapNodes.add(newNode(n)) - result.thisNode = newNode(privKey.toPublicKey(), address) - result.kademlia = newKademliaProtocol(result.thisNode, result, rng = rng) +proc newDiscoveryProtocol*( + privKey: PrivateKey, address: Address, + bootstrapNodes: openArray[ENode], + bindPort: Port, bindIp = IPv4_any(), + rng = newRng()): DiscoveryProtocol = + let + localNode = newNode(privKey.toPublicKey(), address) + discovery = DiscoveryProtocol( + privKey: privKey, + address: address, + localNode: localNode, + bindIp: bindIp, + bindPort: bindPort) + kademlia = newKademliaProtocol(localNode, discovery, rng = rng) + + discovery.kademlia = kademlia + + for n in bootstrapNodes: discovery.bootstrapNodes.add(newNode(n)) + + discovery proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) {.raises: [ValueError, Defect].} = @@ -281,8 +293,8 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress): debug "Receive failed", exc = e.name, err = e.msg proc open*(d: DiscoveryProtocol) {.raises: [Defect, CatchableError].} = - # TODO allow binding to specific IP / IPv6 / etc - let ta = initTAddress(IPv4_any(), d.address.udpPort) + # TODO: allow binding to both IPv4 and IPv6 + let ta = initTAddress(d.bindIp, d.bindPort) d.transp = newDatagramTransport(processClient, udata = d, local = ta) proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] = @@ -341,10 +353,10 @@ when isMainModule: let listenPort = Port(30310) var address = Address(udpPort: listenPort, tcpPort: listenPort) address.ip.family = IpAddressFamily.IPv4 - let discovery = newDiscoveryProtocol(privkey, address, bootnodes) + let discovery = newDiscoveryProtocol(privkey, address, bootnodes, listenPort) - echo discovery.thisNode.node.pubkey - echo "this_node.id: ", discovery.thisNode.id.toHex() + echo discovery.localNode.node.pubkey + echo "this_node.id: ", discovery.localNode.id.toHex() discovery.open() diff --git a/eth/p2p/peer_pool.nim b/eth/p2p/peer_pool.nim index d8853cc..3a66518 100644 --- a/eth/p2p/peer_pool.nim +++ b/eth/p2p/peer_pool.nim @@ -20,10 +20,9 @@ const lookupInterval = 5 connectLoopSleep = chronos.milliseconds(2000) -proc newPeerPool*(network: EthereumNode, - networkId: NetworkId, keyPair: KeyPair, - discovery: DiscoveryProtocol, clientId: string, - listenPort = Port(30303), minPeers = 10): PeerPool = +proc newPeerPool*( + network: EthereumNode, networkId: NetworkId, keyPair: KeyPair, + discovery: DiscoveryProtocol, clientId: string, minPeers = 10): PeerPool = new result result.network = network result.keyPair = keyPair @@ -33,7 +32,6 @@ proc newPeerPool*(network: EthereumNode, result.connectedNodes = initTable[Node, Peer]() result.connectingNodes = initHashSet[Node]() result.observers = initTable[int, PeerObserver]() - result.listenPort = listenPort proc nodesToConnect(p: PeerPool): seq[Node] = p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes) @@ -169,6 +167,8 @@ proc run(p: PeerPool) {.async.} = trace "Running PeerPool..." p.running = true while p.running: + + debug "Amount of peers", amount = p.connectedNodes.len() var dropConnections = false try: await p.maybeConnectToMorePeers() diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index e0bb925..2398501 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -25,6 +25,8 @@ type keys*: KeyPair address*: Address # The external address that the node will be advertising peerPool*: PeerPool + bindIp*: IpAddress + bindPort*: Port # Private fields: capabilities*: seq[Capability] @@ -64,7 +66,6 @@ type connectedNodes*: Table[Node, Peer] connectingNodes*: HashSet[Node] running*: bool - listenPort*: Port observers*: Table[int, PeerObserver] PeerObserver* = object diff --git a/tests/fuzzing/discovery/fuzz.nim b/tests/fuzzing/discovery/fuzz.nim index 8d816ed..a3f4993 100644 --- a/tests/fuzzing/discovery/fuzz.nim +++ b/tests/fuzzing/discovery/fuzz.nim @@ -1,4 +1,5 @@ import + std/net, testutils/fuzzing, chronicles, nimcrypto/keccak, ../../../eth/p2p/[discovery, enode], ../../../eth/[keys, rlp], ../../p2p/p2p_test_helper @@ -18,7 +19,8 @@ init: var targetNodeKey = PrivateKey.fromHex("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[] targetNodeAddr = localAddress(DefaultListeningPort) - targetNode = newDiscoveryProtocol(targetNodeKey, targetNodeAddr, @[]) + targetNode = newDiscoveryProtocol( + targetNodeKey, targetNodeAddr, @[], Port(DefaultListeningPort)) # Create the transport as else replies on the messages send will fail. targetNode.open() diff --git a/tests/p2p/p2p_test_helper.nim b/tests/p2p/p2p_test_helper.nim index b1123c1..ca46fd3 100644 --- a/tests/p2p/p2p_test_helper.nim +++ b/tests/p2p/p2p_test_helper.nim @@ -15,11 +15,16 @@ proc setupTestNode*( capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode {.gcsafe.} = # Don't create new RNG every time in production code! let keys1 = KeyPair.random(rng[]) - result = newEthereumNode(keys1, localAddress(nextPort), NetworkId(1), nil, - addAllCapabilities = false, rng = rng) + var node = newEthereumNode( + keys1, localAddress(nextPort), NetworkId(1), nil, + addAllCapabilities = false, + bindUdpPort = Port(nextPort), bindTcpPort = Port(nextPort), + rng = rng) nextPort.inc for capability in capabilities: - result.addCapability capability + node.addCapability capability + + node template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] diff --git a/tests/p2p/test_discovery.nim b/tests/p2p/test_discovery.nim index ff1a1b0..4d05002 100644 --- a/tests/p2p/test_discovery.nim +++ b/tests/p2p/test_discovery.nim @@ -19,9 +19,10 @@ proc localAddress(port: int): Address = result = Address(udpPort: port, tcpPort: port, ip: parseIpAddress("127.0.0.1")) -proc initDiscoveryNode(privKey: PrivateKey, address: Address, - bootnodes: seq[ENode]): DiscoveryProtocol = - let node = newDiscoveryProtocol(privKey, address, bootnodes) +proc initDiscoveryNode( + privKey: PrivateKey, address: Address, + bootnodes: seq[ENode]): DiscoveryProtocol = + let node = newDiscoveryProtocol(privKey, address, bootnodes, address.udpPort) node.open() return node @@ -68,7 +69,7 @@ procSuite "Discovery Tests": for i in nodes: for j in nodes: if j != i: - check(nodeIdInNodes(i.thisNode.id, j.randomNodes(nodes.len - 1))) + check(nodeIdInNodes(i.localNode.id, j.randomNodes(nodes.len - 1))) test "Test Vectors": # These are the test vectors from EIP-8: