From ac58a1f35caade824d33fd2cf5e05f9058299c96 Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 5 Jan 2021 15:03:38 +0100 Subject: [PATCH 1/4] Only do discovery queries to refresh the table --- eth/p2p/discoveryv5/protocol.nim | 53 ++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 2ce2651..0295561 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -98,9 +98,8 @@ const findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages ## that will be processed maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message - lookupInterval = 60.seconds ## Interval of launching a random lookup to - ## populate the routing table. go-ethereum seems to do 3 runs every 30 - ## minutes. Trinity starts one every minute. + refreshInterval = 5.minutes ## Interval of launching a random query to + ## refresh the routing table. revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this ## value in milliseconds handshakeTimeout* = 2.seconds ## timeout for the reply on the @@ -118,8 +117,9 @@ type routingTable: RoutingTable codec*: Codec awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] - queryLoop: Future[void] + refreshLoop: Future[void] revalidateLoop: Future[void] + lastLookup: chronos.Moment bootstrapRecords*: seq[Record] rng*: ref BrHmacDrbgContext @@ -717,6 +717,7 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] if closestNodes.len > BUCKET_SIZE: closestNodes.del(closestNodes.high()) + d.lastLookup = now(chronos.Moment) return closestNodes proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] @@ -765,6 +766,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] if not seen.containsOrIncl(n.id): queryBuffer.add(n) + d.lastLookup = now(chronos.Moment) return queryBuffer proc queryRandom*(d: Protocol): Future[seq[Node]] @@ -777,6 +779,18 @@ proc queryRandom*(d: Protocol): Future[seq[Node]] return await d.query(id) +proc queryRandom*(d: Protocol, enrField: (string, seq[byte])): + Future[seq[Node]] {.async, raises:[Exception, Defect].} = + ## Perform a query for a random target, return all nodes discovered which + ## contain enrField. + let nodes = await d.queryRandom() + var filtered: seq[Node] + for n in nodes: + if n.record.contains(enrField): + filtered.add(n) + + return filtered + proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async, raises: [Exception, Defect].} = ## Resolve a `Node` based on provided `NodeId`. @@ -816,6 +830,8 @@ proc revalidateNode*(d: Protocol, n: Node) discard d.addNode(nodes[][0]) proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = + ## Loop which revalidates the nodes in the routing table by sending the ping + ## message. # TODO: General Exception raised. try: while true: @@ -826,19 +842,24 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = except CancelledError: trace "revalidateLoop canceled" -proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} = +proc refreshLoop(d: Protocol) {.async, raises: [Exception, Defect].} = + ## Loop that refreshes the routing table by starting a random query in case + ## no queries were done since `refreshInterval` or more. # TODO: General Exception raised. try: - # query target self (neighbour nodes) + # start with a query target self (neighbour nodes) let selfQuery = await d.query(d.localNode.id) trace "Discovered nodes in self target query", nodes = selfQuery.len while true: - let randomQuery = await d.queryRandom() - trace "Discovered nodes in random target query", nodes = randomQuery.len - debug "Total nodes in discv5 routing table", total = d.routingTable.len() - await sleepAsync(lookupInterval) + let currentTime = now(chronos.Moment) + if currentTime > (d.lastLookup + refreshInterval): + let randomQuery = await d.queryRandom() + trace "Discovered nodes in random target query", nodes = randomQuery.len + debug "Total nodes in discv5 routing table", total = d.routingTable.len() + + await sleepAsync(refreshInterval) except CancelledError: - trace "queryLoop canceled" + trace "refreshLoop canceled" proc newProtocol*(privKey: PrivateKey, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, @@ -901,7 +922,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} = debug "Bootstrap node could not be added", uri = toURI(record) proc start*(d: Protocol) {.raises: [Exception, Defect].} = - d.queryLoop = queryLoop(d) + d.refreshLoop = refreshLoop(d) d.revalidateLoop = revalidateLoop(d) proc close*(d: Protocol) {.raises: [Exception, Defect].} = @@ -910,8 +931,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() - if not d.queryLoop.isNil: - d.queryLoop.cancel() + if not d.refreshLoop.isNil: + d.refreshLoop.cancel() d.transp.close() @@ -921,7 +942,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() - if not d.queryLoop.isNil: - await d.queryLoop.cancelAndWait() + if not d.refreshLoop.isNil: + await d.refreshLoop.cancelAndWait() await d.transp.closeWait() From 9cedbc0cc800eec7740998f63186ff7f0ee53ea8 Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 7 Jan 2021 10:15:48 +0100 Subject: [PATCH 2/4] Move code into seedTable and populateTable proc --- eth/p2p/discoveryv5/dcli.nim | 13 ++++++++++- eth/p2p/discoveryv5/protocol.nim | 38 +++++++++++++++++++++++++------- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/eth/p2p/discoveryv5/dcli.nim b/eth/p2p/discoveryv5/dcli.nim index bd6d083..879ea64 100644 --- a/eth/p2p/discoveryv5/dcli.nim +++ b/eth/p2p/discoveryv5/dcli.nim @@ -22,6 +22,11 @@ type desc: "UDP listening port." name: "udp-port" .}: uint16 + listenAddress* {. + defaultValue: defaultListenAddress(config) + desc: "Listening address for the Discovery v5 traffic" + name: "listen-address" }: ValidIpAddress + bootnodes* {. desc: "ENR URI of node to bootstrap discovery with. Argument may be repeated." name: "bootnode" .}: seq[enr.Record] @@ -42,7 +47,7 @@ type name: "metrics" .}: bool metricsAddress* {. - defaultValue: ValidIpAddress.init("127.0.0.1") + defaultValue: defaultAdminListenAddress(config) desc: "Listening address of the metrics server." name: "metrics-address" .}: ValidIpAddress @@ -78,6 +83,12 @@ type desc: "ENR URI of the node to send a talkreq message" name: "node" .}: Node +func defaultListenAddress*(conf: DiscoveryConf): ValidIpAddress = + (static ValidIpAddress.init("0.0.0.0")) + +func defaultAdminListenAddress*(conf: DiscoveryConf): ValidIpAddress = + (static ValidIpAddress.init("127.0.0.1")) + proc parseCmdArg*(T: type enr.Record, p: TaintedString): T = if not fromURI(result, p): raise newException(ConfigurationError, "Invalid ENR") diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 0295561..580b998 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -102,6 +102,7 @@ const ## refresh the routing table. revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this ## value in milliseconds + initialLookups = 1 ## Amount of lookups done when populating the routing table handshakeTimeout* = 2.seconds ## timeout for the reply on the ## whoareyou message responseTimeout* = 4.seconds ## timeout for the response of a request-response @@ -818,6 +819,32 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] return node +proc seedTable*(d: Protocol) = + ## Seed the table with known nodes. + for record in d.bootstrapRecords: + if d.addNode(record): + debug "Added bootstrap node", uri = toURI(record) + else: + debug "Bootstrap node could not be added", uri = toURI(record) + + # TODO: + # Persistent stored nodes could be added to seed from here + # See: https://github.com/status-im/nim-eth/issues/189 + +proc populateTable*(d: Protocol) {.async, raises: [Exception, Defect].} = + ## Do a set of initial lookups to quickly populate the table. + # start with a self target query (neighbour nodes) + let selfQuery = await d.query(d.localNode.id) + trace "Discovered nodes in self target query", nodes = selfQuery.len + + # `initialLookups` random queries + for i in 0.. (d.lastLookup + refreshInterval): @@ -915,11 +941,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} = # object of Exception. In Nim devel this got changed to CatchableError. d.transp = newDatagramTransport(processClient, udata = d, local = ta) - for record in d.bootstrapRecords: - if d.addNode(record): - debug "Added bootstrap node", uri = toURI(record) - else: - debug "Bootstrap node could not be added", uri = toURI(record) + d.seedTable() proc start*(d: Protocol) {.raises: [Exception, Defect].} = d.refreshLoop = refreshLoop(d) From aa7442c9ffe05ab80c1f1abf2dc7dc2d641a59fc Mon Sep 17 00:00:00 2001 From: kdeme Date: Mon, 11 Jan 2021 15:01:02 +0100 Subject: [PATCH 3/4] dcli: actually use the provided listen address --- eth/p2p/discoveryv5/dcli.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/p2p/discoveryv5/dcli.nim b/eth/p2p/discoveryv5/dcli.nim index 879ea64..410a257 100644 --- a/eth/p2p/discoveryv5/dcli.nim +++ b/eth/p2p/discoveryv5/dcli.nim @@ -163,7 +163,7 @@ proc run(config: DiscoveryConf) = let (ip, tcpPort, udpPort) = setupNat(config) d = newProtocol(config.nodeKey, ip, tcpPort, udpPort, - bootstrapRecords = config.bootnodes) + bootstrapRecords = config.bootnodes, bindIp = config.listenAddress) d.open() From 181bbadcbc3a7189b312f69d043cb2c69367674b Mon Sep 17 00:00:00 2001 From: kdeme Date: Wed, 13 Jan 2021 21:44:17 +0100 Subject: [PATCH 4/4] Add more message related metrics --- eth/p2p/discoveryv5/protocol.nim | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 580b998..f8cc3c6 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -85,8 +85,12 @@ export options {.push raises: [Defect].} -declarePublicGauge discovery_message_requests, - "Discovery protocol message requests", labels = ["response"] +declarePublicGauge discovery_message_requests_outgoing, + "Discovery protocol outgoing message requests", labels = ["response"] +declarePublicGauge discovery_message_requests_incoming, + "Discovery protocol incoming message requests", labels = ["response"] +declarePublicGauge discovery_unsolicited_messages, + "Discovery protocol unsolicited or timed-out messages" logScope: topics = "discv5" @@ -311,12 +315,17 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, message: Message) {.raises:[Exception].} = case message.kind of ping: + discovery_message_requests_incoming.inc() d.handlePing(srcId, fromAddr, message.ping, message.reqId) of findNode: + discovery_message_requests_incoming.inc() d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId) of talkreq: + discovery_message_requests_incoming.inc() d.handleTalkReq(srcId, fromAddr, message.talkreq, message.reqId) of regtopic, topicquery: + discovery_message_requests_incoming.inc() + discovery_message_requests_incoming.inc(labelValues = ["no_response"]) trace "Received unimplemented message kind", kind = message.kind, origin = fromAddr else: @@ -324,6 +333,7 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, if d.awaitedMessages.take((srcId, message.reqId), waiter): waiter.complete(some(message)) # TODO: raises: [Exception] else: + discovery_unsolicited_messages.inc() trace "Timed out or unrequested message", kind = message.kind, origin = fromAddr @@ -584,7 +594,7 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T): d.registerRequest(toNode, message, nonce) trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T) d.send(toNode, data) - discovery_message_requests.inc() + discovery_message_requests_outgoing.inc() return reqId proc ping*(d: Protocol, toNode: Node): @@ -601,7 +611,7 @@ proc ping*(d: Protocol, toNode: Node): return ok(resp.get().pong) else: d.replaceNode(toNode) - discovery_message_requests.inc(labelValues = ["timed_out"]) + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Pong message not received in time") proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]): @@ -619,7 +629,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]): return ok(res) else: d.replaceNode(toNode) - discovery_message_requests.inc(labelValues = ["timed_out"]) + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) return err(nodes.error) proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): @@ -636,7 +646,7 @@ proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): return ok(resp.get().talkresp) else: d.replaceNode(toNode) - discovery_message_requests.inc(labelValues = ["timed_out"]) + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Talk response message not received in time") proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} =