nim-eth/eth/p2p/discoveryv5/protocol.nim

1137 lines
44 KiB
Nim

# nim-eth - Node Discovery Protocol v5
# Copyright (c) 2020-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## Node Discovery Protocol v5
##
## Node discovery protocol implementation as per specification:
## https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md
##
## This node discovery protocol implementation uses the same underlying
## implementation of routing table as is also used for the discovery v4
## implementation, which is the same or similar as the one described in the
## original Kademlia paper:
## https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf
##
## This might not be the most optimal implementation for the node discovery
## protocol v5. Why?
##
## The Kademlia paper describes an implementation that starts off from one
## k-bucket, and keeps splitting the bucket as more nodes are discovered and
## added. The bucket splits only on the part of the binary tree where our own
## node its id belongs too (same prefix). Resulting eventually in a k-bucket per
## logarithmic distance (log base2 distance). Well, not really, as nodes with
## ids in the closer distance ranges will never be found. And because of this an
## optimisation is done where buckets will also split sometimes even if the
## nodes own id does not have the same prefix (this is to avoid creating highly
## unbalanced branches which would require longer lookups).
##
## Now, some implementations take a more simplified approach. They just create
## directly a bucket for each possible logarithmic distance (e.g. here 1->256).
## Some implementations also don't create buckets with logarithmic distance
## lower than a certain value (e.g. only 1/15th of the highest buckets),
## because the closer to the node (the lower the distance), the less chance
## there is to still find nodes.
##
## The discovery protocol v4 its `FindNode` call will request the k closest
## nodes. As does original Kademlia. This effectively puts the work at the node
## that gets the request. This node will have to check its buckets and gather
## the closest. Some implementations go over all the nodes in all the buckets
## for this (e.g. go-ethereum discovery v4). However, in our bucket splitting
## approach, this search is improved.
##
## In the discovery protocol v5 the `FindNode` call is changed and now the
## logarithmic distance is passed as parameter instead of the NodeId. And only
## nodes that match that logarithmic distance are allowed to be returned.
## This change was made to not put the trust at the requested node for selecting
## the closest nodes. To counter a possible (mistaken) difference in
## implementation, but more importantly for security reasons. See also:
## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-rationale.md#115-guard-against-kademlia-implementation-flaws
##
## The result is that in an implementation which just stores buckets per
## logarithmic distance, it simply needs to return the right bucket. In this
## split-bucket implementation, this cannot be done as such and thus the closest
## neighbours search is still done. And to do this, a reverse calculation of an
## id at given logarithmic distance is needed (which is why there is the
## `idAtDistance` proc). Next, nodes with invalid distances need to be filtered
## out to be compliant to the specification. This can most likely get further
## optimised, but if it would turn out to be an issue, it is probably easier to
## switch away from the split-bucket approach. The main benefit that the split
## bucket approach has is improved lookups (less hops due to no unbalanced
## branches), but lookup functionality of Kademlia is not something that is
## typically used in discv5. It is mostly used as a secure mechanism to find &
## select peers.
## This `FindNode` change in discovery v5 could also have an effect on the
## efficiency of the network. Work will be moved from the receiver of
## `FindNodes` to the requester. But this could also mean more network traffic,
## as less nodes may potentially be passed around per `FindNode` call, and thus
## more requests may be needed for a lookup (adding bandwidth and latency).
## For this reason Discovery v5.1 has added the possibility to send a `FindNode`
## request with multiple distances specified. This implementation will
## underneath still use the neighbours search, specifically for the first
## distance provided. This means that if distances with wide gaps are provided,
## it could be that only nodes matching the first distance are returned.
## When distance 0 is provided in the requested list of distances, only the own
## ENR will be returned.
{.push raises: [].}
import
std/[tables, sets, math, sequtils, algorithm],
json_serialization/std/net,
results, chronicles, chronos, stint, metrics,
".."/../[rlp, keys],
"."/[messages_encoding, encoding, node, routing_table, enr, random2, sessions,
ip_vote, nodes_verification]
export
results, node, enr, encoding.maxDiscv5PacketSize
declareCounter discovery_message_requests_outgoing,
"Discovery protocol outgoing message requests", labels = ["response"]
declareCounter discovery_message_requests_incoming,
"Discovery protocol incoming message requests", labels = ["response"]
declareCounter discovery_unsolicited_messages,
"Discovery protocol unsolicited or timed-out messages"
declareCounter discovery_enr_auto_update,
"Amount of discovery IP:port address ENR auto updates"
logScope:
topics = "eth p2p discv5"
const
alpha = 3 ## Kademlia concurrency factor
lookupRequestLimit = 3 ## Amount of distances requested in a single Findnode
## message for a lookup or query
findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages
## that will be processed
maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message
refreshInterval = 5.minutes ## Interval of launching a random query to
## refresh the routing table.
revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this
## value in milliseconds
ipMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port
## majority and updating this when ENR auto update is set.
initialLookups = 1 ## Amount of lookups done when populating the routing table
defaultHandshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
defaultResponseTimeout* = 4.seconds ## timeout for the response of a request-response
## call
type
OptAddress* = object
ip*: Opt[IpAddress]
port*: Port
DiscoveryConfig* = object
tableIpLimits*: TableIpLimits
bitsPerHop*: int
handshakeTimeout: Duration
responseTimeout: Duration
Protocol* = ref object
transp: DatagramTransport
localNode*: Node
privateKey: PrivateKey
bindAddress: OptAddress ## UDP binding address
pendingRequests: Table[AESGCMNonce, PendingRequest]
routingTable*: RoutingTable
codec*: Codec
awaitedMessages: Table[(NodeId, RequestId), Future[Opt[Message]]]
refreshLoop: Future[void]
revalidateLoop: Future[void]
ipMajorityLoop: Future[void]
lastLookup: chronos.Moment
bootstrapRecords*: seq[Record]
ipVote: IpVote
enrAutoUpdate: bool
talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of
# overkill here, use sequence
handshakeTimeout: Duration
responseTimeout: Duration
rng*: ref HmacDrbgContext
PendingRequest = object
node: Node
message: seq[byte]
TalkProtocolHandler* = proc(
p: TalkProtocol, request: seq[byte],
fromId: NodeId, fromUdpAddress: Address,
node: Opt[Node]): seq[byte]
{.gcsafe, raises: [].}
TalkProtocol* = ref object of RootObj
protocolHandler*: TalkProtocolHandler
DiscResult*[T] = Result[T, cstring]
const
defaultDiscoveryConfig* = DiscoveryConfig(
tableIpLimits: DefaultTableIpLimits,
bitsPerHop: DefaultBitsPerHop,
handshakeTimeout: defaultHandshakeTimeout,
responseTimeout: defaultResponseTimeout
)
chronicles.formatIt(Opt[Port]): $it
chronicles.formatIt(Opt[IpAddress]): $it
proc addNode*(d: Protocol, node: Node): bool =
## Add `Node` to discovery routing table.
##
## Returns true only when `Node` was added as a new entry to a bucket in the
## routing table.
if d.routingTable.addNode(node) == Added:
return true
else:
return false
proc addNode*(d: Protocol, r: Record): bool =
## Add `Node` from a `Record` to discovery routing table.
##
## Returns false only if no valid `Node` can be created from the `Record` or
## on the conditions of `addNode` from a `Node`.
d.addNode(Node.fromRecord(r))
proc addNode*(d: Protocol, uri: string): bool =
## Add `Node` from a ENR URI to discovery routing table.
##
## Returns false if no valid ENR URI, or on the conditions of `addNode` from
## an `Record`.
let r = enr.Record.fromURI(uri).valueOr:
return false
d.addNode(r)
func getNode*(d: Protocol, id: NodeId): Opt[Node] =
## Get the node with id from the routing table.
d.routingTable.getNode(id)
proc randomNodes*(d: Protocol, maxAmount: int): seq[Node] =
## Get a `maxAmount` of random nodes from the local routing table.
d.routingTable.randomNodes(maxAmount)
proc randomNodes*(d: Protocol, maxAmount: int,
pred: proc(x: Node): bool {.raises: [], gcsafe, noSideEffect.}): seq[Node] =
## Get a `maxAmount` of random nodes from the local routing table with the
## `pred` predicate function applied as filter on the nodes selected.
d.routingTable.randomNodes(maxAmount, pred)
proc randomNodes*(d: Protocol, maxAmount: int,
enrField: (string, seq[byte])): seq[Node] =
## Get a `maxAmount` of random nodes from the local routing table. The
## the nodes selected are filtered by provided `enrField`.
d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField))
func neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE,
seenOnly = false): seq[Node] =
## Return up to k neighbours (closest node ids) of the given node id.
d.routingTable.neighbours(id, k, seenOnly)
func neighboursAtDistances*(d: Protocol, distances: seq[uint16],
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
## Return up to k neighbours (closest node ids) at given distances.
d.routingTable.neighboursAtDistances(distances, k, seenOnly)
func nodesDiscovered*(d: Protocol): int = d.routingTable.len
func privKey*(d: Protocol): lent PrivateKey =
d.privateKey
func getRecord*(d: Protocol): Record =
## Get the ENR of the local node.
d.localNode.record
func updateRecord*(
d: Protocol, enrFields: openArray[(string, seq[byte])]): DiscResult[void] =
## Update the ENR of the local node with provided `enrFields` k:v pairs.
let fields = mapIt(enrFields, toFieldPair(it[0], it[1]))
d.localNode.record.update(d.privateKey, extraFields = fields)
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
# we stored a handshake with in order to get that ENR updated?
proc sendTo(d: Protocol, a: Address, data: seq[byte]): Future[void] {.async: (raises: []).} =
let ta = initTAddress(a.ip, a.port)
try:
await d.transp.sendTo(ta, data)
except CatchableError as e:
# Could be `TransportUseClosedError` in case the transport is already
# closed, or could be `TransportOsError` in case of a socket error.
# In the latter case this would probably mostly occur if the network
# interface underneath gets disconnected or similar.
# It could also be an "Operation not permitted" error, which would
# indicate a firewall restriction kicking in.
# TODO: Should this kind of error be propagated upwards? Probably, but
# it should not stop the process as that would reset the discovery
# progress in case there is even a small window of no connection.
# One case that needs this error available upwards is when revalidating
# nodes. Else the revalidation might end up clearing the routing tabl
# because of ping failures due to own network connection failure.
warn "Discovery send failed", msg = e.msg, address = $ta
proc send*(d: Protocol, a: Address, data: seq[byte]) =
asyncSpawn sendTo(d, a, data)
proc send(d: Protocol, n: Node, data: seq[byte]) =
doAssert(n.address.isSome())
d.send(n.address.get(), data)
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId,
nodes: openArray[Node]) =
proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address,
message: NodesMessage, reqId: RequestId) {.nimcall.} =
let (data, _) = encodeMessagePacket(d.rng[], d.codec, toId, toAddr,
encodeMessage(message, reqId))
trace "Respond message packet", dstId = toId, address = toAddr,
kind = MessageKind.nodes
d.send(toAddr, data)
if nodes.len == 0:
# In case of 0 nodes, a reply is still needed
d.sendNodes(toId, toAddr, NodesMessage(total: 1, enrs: @[]), reqId)
return
var message: NodesMessage
# TODO: Do the total calculation based on the max UDP packet size we want to
# send and the ENR size of all (max 16) nodes.
# Which UDP packet size to take? 1280? 576?
message.total = ceil(nodes.len / maxNodesPerMessage).uint32
for i in 0 ..< nodes.len:
message.enrs.add(nodes[i].record)
if message.enrs.len == maxNodesPerMessage:
d.sendNodes(toId, toAddr, message, reqId)
message.enrs.setLen(0)
if message.enrs.len != 0:
d.sendNodes(toId, toAddr, message, reqId)
proc handlePing(d: Protocol, fromId: NodeId, fromAddr: Address,
ping: PingMessage, reqId: RequestId) =
let pong = PongMessage(enrSeq: d.localNode.record.seqNum, ip: fromAddr.ip,
port: fromAddr.port.uint16)
let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr,
encodeMessage(pong, reqId))
trace "Respond message packet", dstId = fromId, address = fromAddr,
kind = MessageKind.pong
d.send(fromAddr, data)
proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
fn: FindNodeMessage, reqId: RequestId) =
if fn.distances.len == 0:
d.sendNodes(fromId, fromAddr, reqId, [])
elif fn.distances.contains(0):
# A request for our own record.
# It would be a weird request if there are more distances next to 0
# requested, so in this case lets just pass only our own. TODO: OK?
d.sendNodes(fromId, fromAddr, reqId, [d.localNode])
else:
# TODO: Still deduplicate also?
if fn.distances.all(proc (x: uint16): bool = return x <= 256):
d.sendNodes(fromId, fromAddr, reqId,
d.routingTable.neighboursAtDistances(fn.distances, seenOnly = true))
else:
# At least one invalid distance, but the polite node we are, still respond
# with empty nodes.
d.sendNodes(fromId, fromAddr, reqId, [])
proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
talkreq: TalkReqMessage, reqId: RequestId, node: Opt[Node]) =
let talkProtocol = d.talkProtocols.getOrDefault(talkreq.protocol)
let talkresp =
if talkProtocol.isNil() or talkProtocol.protocolHandler.isNil():
# Protocol identifier that is not registered and thus not supported. An
# empty response is send as per specification.
TalkRespMessage(response: @[])
else:
TalkRespMessage(response: talkProtocol.protocolHandler(talkProtocol,
talkreq.request, fromId, fromAddr, node))
let (data, _) = encodeMessagePacket(d.rng[], d.codec, fromId, fromAddr,
encodeMessage(talkresp, reqId))
trace "Respond message packet", dstId = fromId, address = fromAddr,
kind = MessageKind.talkresp
d.send(fromAddr, data)
proc handleMessage(
d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message, node: Opt[Node] = Opt.none(Node)) =
case message.kind
of ping:
discovery_message_requests_incoming.inc()
d.handlePing(srcId, fromAddr, message.ping, message.reqId)
of findNode:
discovery_message_requests_incoming.inc()
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
of talkReq:
discovery_message_requests_incoming.inc()
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId, node)
of regTopic, topicQuery:
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
trace "Received unimplemented message kind", kind = message.kind,
origin = fromAddr
else:
var waiter: Future[Opt[Message]]
if d.awaitedMessages.take((srcId, message.reqId), waiter):
waiter.complete(Opt.some(message))
else:
discovery_unsolicited_messages.inc()
trace "Timed out or unrequested message", kind = message.kind,
origin = fromAddr
func registerTalkProtocol*(d: Protocol, protocolId: seq[byte],
protocol: TalkProtocol): DiscResult[void] =
# Currently allow only for one handler per talk protocol.
if d.talkProtocols.hasKeyOrPut(protocolId, protocol):
err("Protocol identifier already registered")
else:
ok()
proc sendWhoareyou(d: Protocol, toId: NodeId, a: Address,
requestNonce: AESGCMNonce, node: Opt[Node]) =
let key = HandshakeKey(nodeId: toId, address: a)
if not d.codec.hasHandshake(key):
let
recordSeq =
if node.isSome():
node.get().record.seqNum
else:
0
pubkey = node.map(proc(node: Node): PublicKey = node.pubkey)
let data = encodeWhoareyouPacket(d.rng[], d.codec, toId, a, requestNonce,
recordSeq, pubkey)
sleepAsync(d.handshakeTimeout).addCallback() do(data: pointer):
# TODO: should we still provide cancellation in case handshake completes
# correctly?
d.codec.handshakes.del(key)
trace "Send whoareyou", dstId = toId, address = a
d.send(a, data)
else:
debug "Node with this id already has ongoing handshake, ignoring packet"
proc receive*(d: Protocol, a: Address, packet: openArray[byte]) =
let decoded = d.codec.decodePacket(a, packet)
if decoded.isOk:
let packet = decoded[]
case packet.flag
of OrdinaryMessage:
if packet.messageOpt.isSome():
let message = packet.messageOpt.get()
trace "Received message packet", srcId = packet.srcId, address = a,
kind = message.kind
d.handleMessage(packet.srcId, a, message)
else:
trace "Not decryptable message packet received",
srcId = packet.srcId, address = a
d.sendWhoareyou(packet.srcId, a, packet.requestNonce,
d.getNode(packet.srcId))
of Flag.Whoareyou:
trace "Received whoareyou packet", address = a
var pr: PendingRequest
if d.pendingRequests.take(packet.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
let data = encodeHandshakePacket(d.rng[], d.codec, toNode.id,
address, pr.message, packet.whoareyou, toNode.pubkey)
trace "Send handshake message packet", dstId = toNode.id, address
d.send(toNode, data)
else:
debug "Timed out or unrequested whoareyou packet", address = a
of HandshakeMessage:
trace "Received handshake message packet", srcId = packet.srcIdHs,
address = a, kind = packet.message.kind
d.handleMessage(packet.srcIdHs, a, packet.message, packet.node)
# For a handshake message it is possible that we received an newer ENR.
# In that case we can add/update it to the routing table.
if packet.node.isSome():
let node = packet.node.get()
# Lets not add nodes without correct IP in the ENR to the routing table.
# The ENR could contain bogus IPs and although they would get removed
# on the next revalidation, one could spam these as the handshake
# message occurs on (first) incoming messages.
if node.address.isSome() and a == node.address.get():
if d.addNode(node):
trace "Added new node to routing table after handshake", node
else:
trace "Packet decoding error", error = decoded.error, address = a
proc processClient(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async: (raises: []).} =
let proto = getUserData[Protocol](transp)
# TODO: should we use `peekMessage()` to avoid allocation?
let buf = try: transp.getMessage()
except TransportError as e:
# This is likely to be local network connection issues.
warn "Transport getMessage", exception = e.name, msg = e.msg
return
proto.receive(Address(ip: raddr.toIpAddress(), port: raddr.port), buf)
proc replaceNode(d: Protocol, n: Node) =
if n.record notin d.bootstrapRecords:
d.routingTable.replaceNode(n)
else:
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the routing table.
debug "Message request to bootstrap node failed", enr = toURI(n.record)
# TODO: This could be improved to do the clean-up immediately in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
# somewhere
proc registerRequest(d: Protocol, n: Node, message: seq[byte],
nonce: AESGCMNonce) =
let request = PendingRequest(node: n, message: message)
if not d.pendingRequests.hasKeyOrPut(nonce, request):
sleepAsync(d.responseTimeout).addCallback() do(data: pointer):
d.pendingRequests.del(nonce)
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
Future[Opt[Message]] {.async: (raw: true, raises: [CancelledError]).} =
let retFuture = Future[Opt[Message]].Raising([CancelledError]).init("discv5.waitMessage")
let key = (fromNode.id, reqId)
sleepAsync(d.responseTimeout).addCallback() do(data: pointer):
d.awaitedMessages.del(key)
if not retFuture.finished:
retFuture.complete(Opt.none(Message))
d.awaitedMessages[key] = retFuture
retFuture
proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
Future[DiscResult[seq[Record]]] {.async: (raises: [CancelledError]).} =
## Wait for one or more nodes replies.
##
## The first reply will hold the total number of replies expected, and based
## on that, more replies will be awaited.
## If one reply is lost here (timed out), others are ignored too.
## Same counts for out of order receival.
var op = await d.waitMessage(fromNode, reqId)
if op.isSome:
if op.get.kind == nodes:
var res = op.get.nodes.enrs
let total = op.get.nodes.total
for i in 1 ..< total:
op = await d.waitMessage(fromNode, reqId)
if op.isSome and op.get.kind == nodes:
res.add(op.get.nodes.enrs)
else:
# No error on this as we received some nodes.
break
return ok(res)
else:
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to find node message")
else:
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Nodes message not received in time")
proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
RequestId =
doAssert(toNode.address.isSome())
let
address = toNode.address.get()
reqId = RequestId.init(d.rng[])
message = encodeMessage(m, reqId)
let (data, nonce) = encodeMessagePacket(d.rng[], d.codec, toNode.id,
address, message)
d.registerRequest(toNode, message, nonce)
trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T)
d.send(toNode, data)
discovery_message_requests_outgoing.inc()
return reqId
proc ping*(d: Protocol, toNode: Node):
Future[DiscResult[PongMessage]] {.async: (raises: [CancelledError]).} =
## Send a discovery ping message.
##
## Returns the received pong message or an error.
let reqId = d.sendMessage(toNode,
PingMessage(enrSeq: d.localNode.record.seqNum))
let resp = await d.waitMessage(toNode, reqId)
if resp.isSome():
if resp.get().kind == pong:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().pong)
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to ping message")
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Pong message not received in time")
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
Future[DiscResult[seq[Node]]] {.async: (raises: [CancelledError]).} =
## Send a discovery findNode message.
##
## Returns the received nodes or an error.
## Received ENRs are already validated and converted to `Node`.
let reqId = d.sendMessage(toNode, FindNodeMessage(distances: distances))
let nodes = await d.waitNodes(toNode, reqId)
if nodes.isOk:
let res = verifyNodesRecords(nodes.get(), toNode, findNodeResultLimit, distances)
d.routingTable.setJustSeen(toNode)
return ok(res)
else:
d.replaceNode(toNode)
return err(nodes.error)
proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
Future[DiscResult[seq[byte]]] {.async: (raises: [CancelledError]).} =
## Send a discovery talkreq message.
##
## Returns the received talkresp message or an error.
let reqId = d.sendMessage(toNode,
TalkReqMessage(protocol: protocol, request: request))
let resp = await d.waitMessage(toNode, reqId)
if resp.isSome():
if resp.get().kind == talkResp:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().talkResp.response)
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to talk request message")
else:
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Talk response message not received in time")
func lookupDistances*(target, dest: NodeId): seq[uint16] =
let td = logDistance(target, dest)
let tdAsInt = int(td)
result.add(td)
var i = 1
while result.len < lookupRequestLimit:
if tdAsInt + i <= 256:
result.add(td + uint16(i))
if tdAsInt - i > 0:
result.add(td - uint16(i))
inc i
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
Future[seq[Node]] {.async: (raises: [CancelledError]).} =
let dists = lookupDistances(target, destNode.id)
# Instead of doing max `lookupRequestLimit` findNode requests, make use
# of the discv5.1 functionality to request nodes for multiple distances.
let r = await d.findNode(destNode, dists)
if r.isOk:
result.add(r[])
# Attempt to add all nodes discovered
for n in result:
discard d.addNode(n)
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
seenOnly = false)
var asked, seen = HashSet[NodeId]()
asked.incl(d.localNode.id) # No need to ask our own node
seen.incl(d.localNode.id) # No need to discover our own node
for node in closestNodes:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha)
while true:
var i = 0
# Doing `alpha` amount of requests at once as long as closer non queried
# nodes are discovered.
while i < closestNodes.len and pendingQueries.len < alpha:
let n = closestNodes[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(d.lookupWorker(n, target))
inc i
trace "discv5 pending queries", total = pendingQueries.len
if pendingQueries.len == 0:
break
let query =
try:
await one(pendingQueries)
except ValueError:
raiseAssert("pendingQueries should not have been empty")
trace "Got discv5 lookup query response"
let index = pendingQueries.find(query)
if index != -1:
pendingQueries.del(index)
else:
error "Resulting query should have been in the pending queries"
# future.read is possible here but await is recommended (avoids also FuturePendingError)
let nodes = await query
for n in nodes:
if not seen.containsOrIncl(n.id):
# If it wasn't seen before, insert node while remaining sorted
closestNodes.insert(n, closestNodes.lowerBound(n,
proc(x: Node, n: Node): int =
cmp(distance(x.id, target), distance(n.id, target))
))
if closestNodes.len > BUCKET_SIZE:
closestNodes.del(closestNodes.high())
d.lastLookup = now(chronos.Moment)
return closestNodes
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
{.async: (raises: [CancelledError]).} =
## Query k nodes for the given target, returns all nodes found, including the
## nodes queried.
##
## This will take k nodes from the routing table closest to target and
## query them for nodes closest to target. If there are less than k nodes in
## the routing table, nodes returned by the first queries will be used.
var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false)
var asked, seen = HashSet[NodeId]()
asked.incl(d.localNode.id) # No need to ask our own node
seen.incl(d.localNode.id) # No need to discover our own node
for node in queryBuffer:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha)
while true:
var i = 0
while i < min(queryBuffer.len, k) and pendingQueries.len < alpha:
let n = queryBuffer[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(d.lookupWorker(n, target))
inc i
trace "discv5 pending queries", total = pendingQueries.len
if pendingQueries.len == 0:
break
let query =
try:
await one(pendingQueries)
except ValueError:
raiseAssert("pendingQueries should not have been empty")
trace "Got discv5 lookup query response"
let index = pendingQueries.find(query)
if index != -1:
pendingQueries.del(index)
else:
error "Resulting query should have been in the pending queries"
# future.read is possible here but await is recommended (avoids also FuturePendingError)
let nodes = await query
for n in nodes:
if not seen.containsOrIncl(n.id):
queryBuffer.add(n)
d.lastLookup = now(chronos.Moment)
return queryBuffer
proc queryRandom*(d: Protocol): Future[seq[Node]] {.async: (raw: true, raises: [CancelledError]).} =
## Perform a query for a random target, return all nodes discovered.
d.query(NodeId.random(d.rng[]))
proc queryRandom*(d: Protocol, enrField: (string, seq[byte])):
Future[seq[Node]] {.async: (raises: [CancelledError]).} =
## Perform a query for a random target, return all nodes discovered which
## contain enrField.
let nodes = await d.queryRandom()
var filtered: seq[Node]
for n in nodes:
if n.record.contains(enrField):
filtered.add(n)
return filtered
proc resolve*(d: Protocol, id: NodeId): Future[Opt[Node]] {.async: (raises: [CancelledError]).} =
## Resolve a `Node` based on provided `NodeId`.
##
## This will first look in the own routing table. If the node is known, it
## will try to contact if for newer information. If node is not known or it
## does not reply, a lookup is done to see if it can find a (newer) record of
## the node on the network.
if id == d.localNode.id:
return Opt.some(d.localNode)
let node = d.getNode(id)
if node.isSome():
let request = await d.findNode(node.get(), @[0'u16])
# TODO: Handle failures better. E.g. stop on different failures than timeout
if request.isOk() and request[].len > 0:
return Opt.some(request[][0])
let discovered = await d.lookup(id)
for n in discovered:
if n.id == id:
if node.isSome() and node.get().record.seqNum >= n.record.seqNum:
return node
else:
return Opt.some(n)
return node
proc seedTable*(d: Protocol) =
## Seed the table with known nodes.
for record in d.bootstrapRecords:
if d.addNode(record):
debug "Added bootstrap node", uri = toURI(record)
else:
debug "Bootstrap node could not be added", uri = toURI(record)
# TODO:
# Persistent stored nodes could be added to seed from here
# See: https://github.com/status-im/nim-eth/issues/189
proc populateTable*(d: Protocol) {.async: (raises: [CancelledError]).} =
## Do a set of initial lookups to quickly populate the table.
# start with a self target query (neighbour nodes)
let selfQuery = await d.query(d.localNode.id)
trace "Discovered nodes in self target query", nodes = selfQuery.len
# `initialLookups` random queries
for i in 0..<initialLookups:
let randomQuery = await d.queryRandom()
trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in routing table after populate",
total = d.routingTable.len()
proc revalidateNode*(d: Protocol, n: Node) {.async: (raises: [CancelledError]).} =
let pong = await d.ping(n)
if pong.isOk():
let res = pong.get()
if res.enrSeq > n.record.seqNum:
# Request new ENR
let nodes = await d.findNode(n, @[0'u16])
if nodes.isOk() and nodes[].len > 0:
discard d.addNode(nodes[][0])
# Get IP and port from pong message and add it to the ip votes
let a = Address(ip: res.ip, port: Port(res.port))
d.ipVote.insert(n.id, a)
proc revalidateLoop(d: Protocol) {.async: (raises: []).} =
## Loop which revalidates the nodes in the routing table by sending the ping
## message.
try:
while true:
await sleepAsync(milliseconds(d.rng[].rand(revalidateMax)))
let n = d.routingTable.nodeToRevalidate()
if not n.isNil:
asyncSpawn d.revalidateNode(n)
except CancelledError:
trace "revalidateLoop canceled"
proc refreshLoop(d: Protocol) {.async: (raises: []).} =
## Loop that refreshes the routing table by starting a random query in case
## no queries were done since `refreshInterval` or more.
## It also refreshes the majority address voted for via pong responses.
try:
await d.populateTable()
while true:
let currentTime = now(chronos.Moment)
if currentTime > (d.lastLookup + refreshInterval):
let randomQuery = await d.queryRandom()
trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in discv5 routing table", total = d.routingTable.len()
await sleepAsync(refreshInterval)
except CancelledError:
trace "refreshLoop canceled"
proc updateExternalIp*(d: Protocol, extIp: IpAddress, udpPort: Port): bool =
var success = false
let
previous = d.localNode.address
res = d.localNode.update(d.privateKey,
ip = Opt.some(extIp), udpPort = Opt.some(udpPort))
if res.isErr:
warn "Failed updating ENR with newly discovered external address",
previous, newExtIp = extIp, newUdpPort = udpPort, error = res.error
else:
success = true
info "Updated ENR with newly discovered external address",
previous, newExtIp = extIp, newUdpPort = udpPort, uri = toURI(d.localNode.record)
return success
proc ipMajorityLoop(d: Protocol) {.async: (raises: []).} =
## When `enrAutoUpdate` is enabled, the IP:port combination returned
## by the majority will be used to update the local ENR.
## This should be safe as long as the routing table is not overwhelmed by
## malicious nodes trying to provide invalid addresses.
## Why is that?
## - Only one vote per NodeId is counted, and they are removed over time.
## - IP:port values are provided through the pong message. The local node
## initiates this by first sending a ping message. Unsolicited pong messages
## are ignored.
## - At interval pings are send to the least recently contacted node (tail of
## bucket) from a random bucket from the routing table.
## - Only messages that our node initiates (ping, findnode, talkreq) and that
## successfully get a response move a node to the head of the bucket.
## Additionally, findNode requests have typically a randomness to it, as they
## usually come from a query for random NodeId.
## - Currently, when a peer fails the respond, it gets replaced. It doesn't
## remain at the tail of the bucket.
## - There are IP limits on the buckets and the whole routing table.
try:
while true:
let majority = d.ipVote.majority()
if majority.isSome():
if d.localNode.address != majority:
let address = majority.get()
let previous = d.localNode.address
if d.enrAutoUpdate:
let success = d.updateExternalIp(address.ip, address.port)
if success:
discovery_enr_auto_update.inc()
else:
warn "Discovered new external address but ENR auto update is off",
majority, previous
else:
debug "Discovered external address matches current address", majority,
current = d.localNode.address
await sleepAsync(ipMajorityInterval)
except CancelledError:
trace "ipMajorityLoop canceled"
func init*(
T: type DiscoveryConfig,
tableIpLimit: uint,
bucketIpLimit: uint,
bitsPerHop: int,
handshakeTimeout: Duration,
responseTimeout: Duration
): T =
DiscoveryConfig(
tableIpLimits: TableIpLimits(
tableIpLimit: tableIpLimit,
bucketIpLimit: bucketIpLimit),
bitsPerHop: bitsPerHop,
handshakeTimeout: handshakeTimeout,
responseTimeout: responseTimeout
)
func init*(
T: type DiscoveryConfig,
tableIpLimit: uint,
bucketIpLimit: uint,
bitsPerHop: int): T =
DiscoveryConfig.init(
tableIpLimit,
bucketIpLimit,
bitsPerHop,
defaultHandshakeTimeout,
defaultResponseTimeout
)
proc newProtocol*(
privKey: PrivateKey,
enrIp: Opt[IpAddress],
enrTcpPort, enrUdpPort: Opt[Port],
localEnrFields: openArray[(string, seq[byte])] = [],
bootstrapRecords: openArray[Record] = [],
previousRecord = Opt.none(enr.Record),
bindPort: Port,
bindIp = IPv4_any(),
enrAutoUpdate = false,
config = defaultDiscoveryConfig,
rng = newRng()):
Protocol =
# TODO: Tried adding bindPort = udpPort as parameter but that gave
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
# Anyhow, nim-beacon-chain would also require some changes to support port
# remapping through NAT and this API is also subject to change once we
# introduce support for ipv4 + ipv6 binding/listening.
let customEnrFields = mapIt(localEnrFields, toFieldPair(it[0], it[1]))
# TODO:
# - Defect as is now or return a result for enr errors?
# - In case incorrect key, allow for new enr based on new key (new node id)?
var record: Record
if previousRecord.isSome():
record = previousRecord.get()
# TODO: this is faulty in case the intent is to remove a field with
# opt.none
record.update(privKey, enrIp, enrTcpPort, enrUdpPort,
customEnrFields).expect("Record within size limits and correct key")
else:
record = enr.Record.init(1, privKey, enrIp, enrTcpPort, enrUdpPort,
customEnrFields).expect("Record within size limits")
info "Discovery ENR initialized", enrAutoUpdate, seqNum = record.seqNum,
ip = enrIp, tcpPort = enrTcpPort, udpPort = enrUdpPort,
customEnrFields, uri = toURI(record)
if enrIp.isNone():
if enrAutoUpdate:
notice "No external IP provided for the ENR, this node will not be " &
"discoverable until the ENR is updated with the discovered external IP address"
else:
warn "No external IP provided for the ENR, this node will not be discoverable"
let node = Node.fromRecord(record)
# TODO Consider whether this should be a Defect
doAssert rng != nil, "RNG initialization failed"
Protocol(
privateKey: privKey,
localNode: node,
bindAddress: OptAddress(ip: Opt.some(bindIp), port: bindPort),
codec: Codec(localNode: node, privKey: privKey,
sessions: Sessions.init(256)),
bootstrapRecords: @bootstrapRecords,
ipVote: IpVote.init(),
enrAutoUpdate: enrAutoUpdate,
routingTable: RoutingTable.init(
node, config.bitsPerHop, config.tableIpLimits, rng),
handshakeTimeout: config.handshakeTimeout,
responseTimeout: config.responseTimeout,
rng: rng)
proc newProtocol*(
privKey: PrivateKey,
enrIp: Opt[IpAddress],
enrTcpPort, enrUdpPort: Opt[Port],
localEnrFields: openArray[(string, seq[byte])] = [],
bootstrapRecords: openArray[Record] = [],
previousRecord = Opt.none(enr.Record),
bindPort: Port,
bindIp: Opt[IpAddress],
enrAutoUpdate = false,
config = defaultDiscoveryConfig,
rng = newRng()): Protocol =
let
customEnrFields = mapIt(localEnrFields, toFieldPair(it[0], it[1]))
record =
if previousRecord.isSome():
var res = previousRecord.get()
# TODO: this is faulty in case the intent is to remove a field with
# opt.none
res.update(privKey, enrIp, enrTcpPort, enrUdpPort,
customEnrFields).expect("Record within size limits and correct key")
res
else:
enr.Record.init(1, privKey, enrIp, enrTcpPort, enrUdpPort,
customEnrFields).expect("Record within size limits")
info "Discovery ENR initialized", enrAutoUpdate, seqNum = record.seqNum,
ip = enrIp, tcpPort = enrTcpPort, udpPort = enrUdpPort,
customEnrFields, uri = toURI(record)
if enrIp.isNone():
if enrAutoUpdate:
notice "No external IP provided for the ENR, this node will not be " &
"discoverable until the ENR is updated with the discovered " &
"external IP address"
else:
warn "No external IP provided for the ENR, this node will not be " &
"discoverable"
let node = Node.fromRecord(record)
doAssert not(isNil(rng)), "RNG initialization failed"
Protocol(
privateKey: privKey,
localNode: node,
bindAddress: OptAddress(ip: bindIp, port: bindPort),
codec: Codec(localNode: node, privKey: privKey,
sessions: Sessions.init(256)),
bootstrapRecords: @bootstrapRecords,
ipVote: IpVote.init(),
enrAutoUpdate: enrAutoUpdate,
routingTable: RoutingTable.init(
node, config.bitsPerHop, config.tableIpLimits, rng),
handshakeTimeout: config.handshakeTimeout,
responseTimeout: config.responseTimeout,
rng: rng)
proc `$`*(a: OptAddress): string =
if a.ip.isNone():
"*:" & $a.port
else:
$a.ip.get() & ":" & $a.port
chronicles.formatIt(OptAddress): $it
template listeningAddress*(p: Protocol): Address =
if p.bindAddress.ip.isNone():
let ta = getAutoAddress(p.bindAddress.port)
Address(ta.toIpAddress(), ta.port)
else:
Address(p.bindAddress.ip.get(), p.bindAddress.port)
proc open*(d: Protocol) {.raises: [TransportOsError].} =
info "Starting discovery node", node = d.localNode,
bindAddress = d.bindAddress
# TODO allow binding to specific IP / IPv6 / etc
d.transp =
newDatagramTransport(processClient, udata = d,
localPort = d.bindAddress.port,
local = d.bindAddress.ip)
d.seedTable()
proc start*(d: Protocol) =
d.refreshLoop = refreshLoop(d)
d.revalidateLoop = revalidateLoop(d)
d.ipMajorityLoop = ipMajorityLoop(d)
proc closeWait*(d: Protocol) {.async: (raises: []).} =
doAssert(not d.transp.closed)
debug "Closing discovery node", node = d.localNode
var futures: seq[Future[void]]
if not d.revalidateLoop.isNil:
futures.add(d.revalidateLoop.cancelAndWait())
if not d.refreshLoop.isNil:
futures.add(d.refreshLoop.cancelAndWait())
if not d.ipMajorityLoop.isNil:
futures.add(d.ipMajorityLoop.cancelAndWait())
await noCancel(allFutures(futures))
await noCancel(d.transp.closeWait())
proc close*(d: Protocol) {.deprecated: "Please use closeWait() instead".} =
asyncSpawn d.closeWait()