Add bind ip and bind port for eth1 rlpx and discovery protocols (#485)

Includes some small refactor on usage of result and Nim style
according to style guide.
This commit is contained in:
Kim De Mey 2022-03-15 18:08:15 +01:00 committed by GitHub
parent dff9040cc1
commit e62fdfe6f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 54 deletions

View File

@ -29,7 +29,8 @@ proc addCapability*(node: var EthereumNode, p: ProtocolInfo) =
template addCapability*(node: var EthereumNode, Protocol: type) = template addCapability*(node: var EthereumNode, Protocol: type) =
addCapability(node, Protocol.protocolInfo) addCapability(node, Protocol.protocolInfo)
proc newEthereumNode*(keys: KeyPair, proc newEthereumNode*(
keys: KeyPair,
address: Address, address: Address,
networkId: NetworkId, networkId: NetworkId,
chain: AbstractChainDB, chain: AbstractChainDB,
@ -37,6 +38,10 @@ proc newEthereumNode*(keys: KeyPair,
addAllCapabilities = true, addAllCapabilities = true,
useCompression: bool = false, useCompression: bool = false,
minPeers = 10, minPeers = 10,
bootstrapNodes: seq[ENode] = @[],
bindUdpPort: Port,
bindTcpPort: Port,
bindIp = IPv4_any(),
rng = newRng()): EthereumNode = rng = newRng()): EthereumNode =
if rng == nil: # newRng could fail if rng == nil: # newRng could fail
@ -50,6 +55,12 @@ proc newEthereumNode*(keys: KeyPair,
result.capabilities.newSeq 0 result.capabilities.newSeq 0
result.address = address result.address = address
result.connectionState = ConnectionState.None result.connectionState = ConnectionState.None
result.bindIp = bindIp
result.bindPort = bindTcpPort
result.discovery = newDiscoveryProtocol(
keys.seckey, address, bootstrapNodes, bindUdpPort, bindIp, rng)
result.rng = rng result.rng = rng
when useSnappy: when useSnappy:
@ -58,10 +69,10 @@ proc newEthereumNode*(keys: KeyPair,
result.protocolStates.newSeq allProtocols.len result.protocolStates.newSeq allProtocols.len
result.peerPool = newPeerPool(result, networkId, result.peerPool = newPeerPool(
keys, nil, result, networkId, keys, nil, clientId, minPeers = minPeers)
clientId, address.tcpPort,
minPeers = minPeers) result.peerPool.discovery = result.discovery
if addAllCapabilities: if addAllCapabilities:
for p in allProtocols: for p in allProtocols:
@ -81,8 +92,8 @@ proc listeningAddress*(node: EthereumNode): ENode =
node.toENode() node.toENode()
proc startListening*(node: EthereumNode) {.raises: [CatchableError, Defect].} = proc startListening*(node: EthereumNode) {.raises: [CatchableError, Defect].} =
# TODO allow binding to specific IP / IPv6 / etc # TODO: allow binding to both IPv4 & IPv6
let ta = initTAddress(IPv4_any(), node.address.tcpPort) let ta = initTAddress(node.bindIp, node.bindPort)
if node.listeningServer == nil: if node.listeningServer == nil:
node.listeningServer = createStreamServer(ta, processIncoming, node.listeningServer = createStreamServer(ta, processIncoming,
{ReuseAddr}, {ReuseAddr},
@ -90,18 +101,12 @@ proc startListening*(node: EthereumNode) {.raises: [CatchableError, Defect].} =
node.listeningServer.start() node.listeningServer.start()
info "RLPx listener up", self = node.listeningAddress info "RLPx listener up", self = node.listeningAddress
proc connectToNetwork*(node: EthereumNode, proc connectToNetwork*(
bootstrapNodes: seq[ENode], node: EthereumNode, startListening = true,
startListening = true, enableDiscovery = true, waitForPeers = true) {.async.} =
enableDiscovery = true,
waitForPeers = true) {.async.} =
doAssert node.connectionState == ConnectionState.None doAssert node.connectionState == ConnectionState.None
node.connectionState = Connecting node.connectionState = Connecting
node.discovery = newDiscoveryProtocol(node.keys.seckey,
node.address,
bootstrapNodes)
node.peerPool.discovery = node.discovery
if startListening: if startListening:
p2p.startListening(node) p2p.startListening(node)

View File

@ -33,9 +33,11 @@ type
privKey: PrivateKey privKey: PrivateKey
address: Address address: Address
bootstrapNodes*: seq[Node] bootstrapNodes*: seq[Node]
thisNode*: Node localNode*: Node
kademlia*: KademliaProtocol[DiscoveryProtocol] kademlia*: KademliaProtocol[DiscoveryProtocol]
transp: DatagramTransport transp: DatagramTransport
bindIp: IpAddress
bindPort: Port
CommandId = enum CommandId = enum
cmdPing = 1 cmdPing = 1
@ -154,16 +156,26 @@ proc sendNeighbours*(d: DiscoveryProtocol, node: Node, neighbours: seq[Node]) =
if nodes.len != 0: flush() if nodes.len != 0: flush()
proc newDiscoveryProtocol*(privKey: PrivateKey, address: Address, proc newDiscoveryProtocol*(
bootstrapNodes: openArray[ENode], rng = newRng() privKey: PrivateKey, address: Address,
): DiscoveryProtocol = bootstrapNodes: openArray[ENode],
result.new() bindPort: Port, bindIp = IPv4_any(),
result.privKey = privKey rng = newRng()): DiscoveryProtocol =
result.address = address let
result.bootstrapNodes = newSeqOfCap[Node](bootstrapNodes.len) localNode = newNode(privKey.toPublicKey(), address)
for n in bootstrapNodes: result.bootstrapNodes.add(newNode(n)) discovery = DiscoveryProtocol(
result.thisNode = newNode(privKey.toPublicKey(), address) privKey: privKey,
result.kademlia = newKademliaProtocol(result.thisNode, result, rng = rng) address: address,
localNode: localNode,
bindIp: bindIp,
bindPort: bindPort)
kademlia = newKademliaProtocol(localNode, discovery, rng = rng)
discovery.kademlia = kademlia
for n in bootstrapNodes: discovery.bootstrapNodes.add(newNode(n))
discovery
proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256]) proc recvPing(d: DiscoveryProtocol, node: Node, msgHash: MDigest[256])
{.raises: [ValueError, Defect].} = {.raises: [ValueError, Defect].} =
@ -281,8 +293,8 @@ proc processClient(transp: DatagramTransport, raddr: TransportAddress):
debug "Receive failed", exc = e.name, err = e.msg debug "Receive failed", exc = e.name, err = e.msg
proc open*(d: DiscoveryProtocol) {.raises: [Defect, CatchableError].} = proc open*(d: DiscoveryProtocol) {.raises: [Defect, CatchableError].} =
# TODO allow binding to specific IP / IPv6 / etc # TODO: allow binding to both IPv4 and IPv6
let ta = initTAddress(IPv4_any(), d.address.udpPort) let ta = initTAddress(d.bindIp, d.bindPort)
d.transp = newDatagramTransport(processClient, udata = d, local = ta) d.transp = newDatagramTransport(processClient, udata = d, local = ta)
proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] = proc lookupRandom*(d: DiscoveryProtocol): Future[seq[Node]] =
@ -341,10 +353,10 @@ when isMainModule:
let listenPort = Port(30310) let listenPort = Port(30310)
var address = Address(udpPort: listenPort, tcpPort: listenPort) var address = Address(udpPort: listenPort, tcpPort: listenPort)
address.ip.family = IpAddressFamily.IPv4 address.ip.family = IpAddressFamily.IPv4
let discovery = newDiscoveryProtocol(privkey, address, bootnodes) let discovery = newDiscoveryProtocol(privkey, address, bootnodes, listenPort)
echo discovery.thisNode.node.pubkey echo discovery.localNode.node.pubkey
echo "this_node.id: ", discovery.thisNode.id.toHex() echo "this_node.id: ", discovery.localNode.id.toHex()
discovery.open() discovery.open()

View File

@ -20,10 +20,9 @@ const
lookupInterval = 5 lookupInterval = 5
connectLoopSleep = chronos.milliseconds(2000) connectLoopSleep = chronos.milliseconds(2000)
proc newPeerPool*(network: EthereumNode, proc newPeerPool*(
networkId: NetworkId, keyPair: KeyPair, network: EthereumNode, networkId: NetworkId, keyPair: KeyPair,
discovery: DiscoveryProtocol, clientId: string, discovery: DiscoveryProtocol, clientId: string, minPeers = 10): PeerPool =
listenPort = Port(30303), minPeers = 10): PeerPool =
new result new result
result.network = network result.network = network
result.keyPair = keyPair result.keyPair = keyPair
@ -33,7 +32,6 @@ proc newPeerPool*(network: EthereumNode,
result.connectedNodes = initTable[Node, Peer]() result.connectedNodes = initTable[Node, Peer]()
result.connectingNodes = initHashSet[Node]() result.connectingNodes = initHashSet[Node]()
result.observers = initTable[int, PeerObserver]() result.observers = initTable[int, PeerObserver]()
result.listenPort = listenPort
proc nodesToConnect(p: PeerPool): seq[Node] = proc nodesToConnect(p: PeerPool): seq[Node] =
p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes) p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes)
@ -169,6 +167,8 @@ proc run(p: PeerPool) {.async.} =
trace "Running PeerPool..." trace "Running PeerPool..."
p.running = true p.running = true
while p.running: while p.running:
debug "Amount of peers", amount = p.connectedNodes.len()
var dropConnections = false var dropConnections = false
try: try:
await p.maybeConnectToMorePeers() await p.maybeConnectToMorePeers()

View File

@ -25,6 +25,8 @@ type
keys*: KeyPair keys*: KeyPair
address*: Address # The external address that the node will be advertising address*: Address # The external address that the node will be advertising
peerPool*: PeerPool peerPool*: PeerPool
bindIp*: IpAddress
bindPort*: Port
# Private fields: # Private fields:
capabilities*: seq[Capability] capabilities*: seq[Capability]
@ -64,7 +66,6 @@ type
connectedNodes*: Table[Node, Peer] connectedNodes*: Table[Node, Peer]
connectingNodes*: HashSet[Node] connectingNodes*: HashSet[Node]
running*: bool running*: bool
listenPort*: Port
observers*: Table[int, PeerObserver] observers*: Table[int, PeerObserver]
PeerObserver* = object PeerObserver* = object

View File

@ -1,4 +1,5 @@
import import
std/net,
testutils/fuzzing, chronicles, nimcrypto/keccak, testutils/fuzzing, chronicles, nimcrypto/keccak,
../../../eth/p2p/[discovery, enode], ../../../eth/[keys, rlp], ../../../eth/p2p/[discovery, enode], ../../../eth/[keys, rlp],
../../p2p/p2p_test_helper ../../p2p/p2p_test_helper
@ -18,7 +19,8 @@ init:
var var
targetNodeKey = PrivateKey.fromHex("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[] targetNodeKey = PrivateKey.fromHex("a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")[]
targetNodeAddr = localAddress(DefaultListeningPort) targetNodeAddr = localAddress(DefaultListeningPort)
targetNode = newDiscoveryProtocol(targetNodeKey, targetNodeAddr, @[]) targetNode = newDiscoveryProtocol(
targetNodeKey, targetNodeAddr, @[], Port(DefaultListeningPort))
# Create the transport as else replies on the messages send will fail. # Create the transport as else replies on the messages send will fail.
targetNode.open() targetNode.open()

View File

@ -15,11 +15,16 @@ proc setupTestNode*(
capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode {.gcsafe.} = capabilities: varargs[ProtocolInfo, `protocolInfo`]): EthereumNode {.gcsafe.} =
# Don't create new RNG every time in production code! # Don't create new RNG every time in production code!
let keys1 = KeyPair.random(rng[]) let keys1 = KeyPair.random(rng[])
result = newEthereumNode(keys1, localAddress(nextPort), NetworkId(1), nil, var node = newEthereumNode(
addAllCapabilities = false, rng = rng) keys1, localAddress(nextPort), NetworkId(1), nil,
addAllCapabilities = false,
bindUdpPort = Port(nextPort), bindTcpPort = Port(nextPort),
rng = rng)
nextPort.inc nextPort.inc
for capability in capabilities: for capability in capabilities:
result.addCapability capability node.addCapability capability
node
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]

View File

@ -19,9 +19,10 @@ proc localAddress(port: int): Address =
result = Address(udpPort: port, tcpPort: port, result = Address(udpPort: port, tcpPort: port,
ip: parseIpAddress("127.0.0.1")) ip: parseIpAddress("127.0.0.1"))
proc initDiscoveryNode(privKey: PrivateKey, address: Address, proc initDiscoveryNode(
privKey: PrivateKey, address: Address,
bootnodes: seq[ENode]): DiscoveryProtocol = bootnodes: seq[ENode]): DiscoveryProtocol =
let node = newDiscoveryProtocol(privKey, address, bootnodes) let node = newDiscoveryProtocol(privKey, address, bootnodes, address.udpPort)
node.open() node.open()
return node return node
@ -68,7 +69,7 @@ procSuite "Discovery Tests":
for i in nodes: for i in nodes:
for j in nodes: for j in nodes:
if j != i: if j != i:
check(nodeIdInNodes(i.thisNode.id, j.randomNodes(nodes.len - 1))) check(nodeIdInNodes(i.localNode.id, j.randomNodes(nodes.len - 1)))
test "Test Vectors": test "Test Vectors":
# These are the test vectors from EIP-8: # These are the test vectors from EIP-8: