mirror of https://github.com/status-im/nim-eth.git
Add async raises annotations for discv5 (#690)
This commit is contained in:
parent
c9b545b6c4
commit
50e71b2daa
|
@ -257,7 +257,7 @@ func updateRecord*(
|
||||||
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
# TODO: Would it make sense to actively ping ("broadcast") to all the peers
|
||||||
# we stored a handshake with in order to get that ENR updated?
|
# we stored a handshake with in order to get that ENR updated?
|
||||||
|
|
||||||
proc sendTo(d: Protocol, a: Address, data: seq[byte]): Future[void] {.async.} =
|
proc sendTo(d: Protocol, a: Address, data: seq[byte]): Future[void] {.async: (raises: []).} =
|
||||||
let ta = initTAddress(a.ip, a.port)
|
let ta = initTAddress(a.ip, a.port)
|
||||||
try:
|
try:
|
||||||
await d.transp.sendTo(ta, data)
|
await d.transp.sendTo(ta, data)
|
||||||
|
@ -504,7 +504,7 @@ proc registerRequest(d: Protocol, n: Node, message: seq[byte],
|
||||||
d.pendingRequests.del(nonce)
|
d.pendingRequests.del(nonce)
|
||||||
|
|
||||||
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
|
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
|
||||||
Future[Option[Message]] =
|
Future[Option[Message]] {.async: (raw: true, raises: []).} =
|
||||||
result = newFuture[Option[Message]]("waitMessage")
|
result = newFuture[Option[Message]]("waitMessage")
|
||||||
let res = result
|
let res = result
|
||||||
let key = (fromNode.id, reqId)
|
let key = (fromNode.id, reqId)
|
||||||
|
@ -515,7 +515,7 @@ proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
|
||||||
d.awaitedMessages[key] = result
|
d.awaitedMessages[key] = result
|
||||||
|
|
||||||
proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
|
proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
|
||||||
Future[DiscResult[seq[Record]]] {.async.} =
|
Future[DiscResult[seq[Record]]] {.async: (raises: [CancelledError]).} =
|
||||||
## Wait for one or more nodes replies.
|
## Wait for one or more nodes replies.
|
||||||
##
|
##
|
||||||
## The first reply will hold the total number of replies expected, and based
|
## The first reply will hold the total number of replies expected, and based
|
||||||
|
@ -560,7 +560,7 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
|
||||||
return reqId
|
return reqId
|
||||||
|
|
||||||
proc ping*(d: Protocol, toNode: Node):
|
proc ping*(d: Protocol, toNode: Node):
|
||||||
Future[DiscResult[PongMessage]] {.async.} =
|
Future[DiscResult[PongMessage]] {.async: (raises: [CancelledError]).} =
|
||||||
## Send a discovery ping message.
|
## Send a discovery ping message.
|
||||||
##
|
##
|
||||||
## Returns the received pong message or an error.
|
## Returns the received pong message or an error.
|
||||||
|
@ -582,7 +582,7 @@ proc ping*(d: Protocol, toNode: Node):
|
||||||
return err("Pong message not received in time")
|
return err("Pong message not received in time")
|
||||||
|
|
||||||
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
||||||
Future[DiscResult[seq[Node]]] {.async.} =
|
Future[DiscResult[seq[Node]]] {.async: (raises: [CancelledError]).} =
|
||||||
## Send a discovery findNode message.
|
## Send a discovery findNode message.
|
||||||
##
|
##
|
||||||
## Returns the received nodes or an error.
|
## Returns the received nodes or an error.
|
||||||
|
@ -599,7 +599,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
||||||
return err(nodes.error)
|
return err(nodes.error)
|
||||||
|
|
||||||
proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
|
proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
|
||||||
Future[DiscResult[seq[byte]]] {.async.} =
|
Future[DiscResult[seq[byte]]] {.async: (raises: [CancelledError]).} =
|
||||||
## Send a discovery talkreq message.
|
## Send a discovery talkreq message.
|
||||||
##
|
##
|
||||||
## Returns the received talkresp message or an error.
|
## Returns the received talkresp message or an error.
|
||||||
|
@ -633,7 +633,7 @@ func lookupDistances*(target, dest: NodeId): seq[uint16] =
|
||||||
inc i
|
inc i
|
||||||
|
|
||||||
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
|
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
|
||||||
Future[seq[Node]] {.async.} =
|
Future[seq[Node]] {.async: (raises: [CancelledError]).} =
|
||||||
let dists = lookupDistances(target, destNode.id)
|
let dists = lookupDistances(target, destNode.id)
|
||||||
|
|
||||||
# Instead of doing max `lookupRequestLimit` findNode requests, make use
|
# Instead of doing max `lookupRequestLimit` findNode requests, make use
|
||||||
|
@ -646,7 +646,7 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
|
||||||
for n in result:
|
for n in result:
|
||||||
discard d.addNode(n)
|
discard d.addNode(n)
|
||||||
|
|
||||||
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async: (raises: [CancelledError]).} =
|
||||||
## Perform a lookup for the given target, return the closest n nodes to the
|
## Perform a lookup for the given target, return the closest n nodes to the
|
||||||
## target. Maximum value for n is `BUCKET_SIZE`.
|
## target. Maximum value for n is `BUCKET_SIZE`.
|
||||||
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
||||||
|
@ -660,7 +660,7 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
||||||
for node in closestNodes:
|
for node in closestNodes:
|
||||||
seen.incl(node.id)
|
seen.incl(node.id)
|
||||||
|
|
||||||
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
|
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha)
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
var i = 0
|
var i = 0
|
||||||
|
@ -677,7 +677,12 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
||||||
if pendingQueries.len == 0:
|
if pendingQueries.len == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
let query = await one(pendingQueries)
|
let query =
|
||||||
|
try:
|
||||||
|
await one(pendingQueries)
|
||||||
|
except ValueError:
|
||||||
|
raiseAssert("pendingQueries should not have been empty")
|
||||||
|
|
||||||
trace "Got discv5 lookup query response"
|
trace "Got discv5 lookup query response"
|
||||||
|
|
||||||
let index = pendingQueries.find(query)
|
let index = pendingQueries.find(query)
|
||||||
|
@ -686,8 +691,8 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
||||||
else:
|
else:
|
||||||
error "Resulting query should have been in the pending queries"
|
error "Resulting query should have been in the pending queries"
|
||||||
|
|
||||||
let nodes = query.read
|
# future.read is possible here but await is recommended (avoids also FuturePendingError)
|
||||||
# TODO: Remove node on timed-out query?
|
let nodes = await query
|
||||||
for n in nodes:
|
for n in nodes:
|
||||||
if not seen.containsOrIncl(n.id):
|
if not seen.containsOrIncl(n.id):
|
||||||
# If it wasn't seen before, insert node while remaining sorted
|
# If it wasn't seen before, insert node while remaining sorted
|
||||||
|
@ -703,7 +708,7 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
||||||
return closestNodes
|
return closestNodes
|
||||||
|
|
||||||
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||||
{.async.} =
|
{.async: (raises: [CancelledError]).} =
|
||||||
## Query k nodes for the given target, returns all nodes found, including the
|
## Query k nodes for the given target, returns all nodes found, including the
|
||||||
## nodes queried.
|
## nodes queried.
|
||||||
##
|
##
|
||||||
|
@ -718,7 +723,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||||
for node in queryBuffer:
|
for node in queryBuffer:
|
||||||
seen.incl(node.id)
|
seen.incl(node.id)
|
||||||
|
|
||||||
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
|
var pendingQueries = newSeqOfCap[Future[seq[Node]].Raising([CancelledError])](alpha)
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
var i = 0
|
var i = 0
|
||||||
|
@ -733,7 +738,12 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||||
if pendingQueries.len == 0:
|
if pendingQueries.len == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
let query = await one(pendingQueries)
|
let query =
|
||||||
|
try:
|
||||||
|
await one(pendingQueries)
|
||||||
|
except ValueError:
|
||||||
|
raiseAssert("pendingQueries should not have been empty")
|
||||||
|
|
||||||
trace "Got discv5 lookup query response"
|
trace "Got discv5 lookup query response"
|
||||||
|
|
||||||
let index = pendingQueries.find(query)
|
let index = pendingQueries.find(query)
|
||||||
|
@ -742,8 +752,8 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||||
else:
|
else:
|
||||||
error "Resulting query should have been in the pending queries"
|
error "Resulting query should have been in the pending queries"
|
||||||
|
|
||||||
let nodes = query.read
|
# future.read is possible here but await is recommended (avoids also FuturePendingError)
|
||||||
# TODO: Remove node on timed-out query?
|
let nodes = await query
|
||||||
for n in nodes:
|
for n in nodes:
|
||||||
if not seen.containsOrIncl(n.id):
|
if not seen.containsOrIncl(n.id):
|
||||||
queryBuffer.add(n)
|
queryBuffer.add(n)
|
||||||
|
@ -751,12 +761,12 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||||
d.lastLookup = now(chronos.Moment)
|
d.lastLookup = now(chronos.Moment)
|
||||||
return queryBuffer
|
return queryBuffer
|
||||||
|
|
||||||
proc queryRandom*(d: Protocol): Future[seq[Node]] =
|
proc queryRandom*(d: Protocol): Future[seq[Node]] {.async: (raw: true, raises: [CancelledError]).} =
|
||||||
## Perform a query for a random target, return all nodes discovered.
|
## Perform a query for a random target, return all nodes discovered.
|
||||||
d.query(NodeId.random(d.rng[]))
|
d.query(NodeId.random(d.rng[]))
|
||||||
|
|
||||||
proc queryRandom*(d: Protocol, enrField: (string, seq[byte])):
|
proc queryRandom*(d: Protocol, enrField: (string, seq[byte])):
|
||||||
Future[seq[Node]] {.async.} =
|
Future[seq[Node]] {.async: (raises: [CancelledError]).} =
|
||||||
## Perform a query for a random target, return all nodes discovered which
|
## Perform a query for a random target, return all nodes discovered which
|
||||||
## contain enrField.
|
## contain enrField.
|
||||||
let nodes = await d.queryRandom()
|
let nodes = await d.queryRandom()
|
||||||
|
@ -767,7 +777,7 @@ proc queryRandom*(d: Protocol, enrField: (string, seq[byte])):
|
||||||
|
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
proc resolve*(d: Protocol, id: NodeId): Future[Opt[Node]] {.async.} =
|
proc resolve*(d: Protocol, id: NodeId): Future[Opt[Node]] {.async: (raises: [CancelledError]).} =
|
||||||
## Resolve a `Node` based on provided `NodeId`.
|
## Resolve a `Node` based on provided `NodeId`.
|
||||||
##
|
##
|
||||||
## This will first look in the own routing table. If the node is known, it
|
## This will first look in the own routing table. If the node is known, it
|
||||||
|
@ -807,7 +817,7 @@ proc seedTable*(d: Protocol) =
|
||||||
# Persistent stored nodes could be added to seed from here
|
# Persistent stored nodes could be added to seed from here
|
||||||
# See: https://github.com/status-im/nim-eth/issues/189
|
# See: https://github.com/status-im/nim-eth/issues/189
|
||||||
|
|
||||||
proc populateTable*(d: Protocol) {.async.} =
|
proc populateTable*(d: Protocol) {.async: (raises: [CancelledError]).} =
|
||||||
## Do a set of initial lookups to quickly populate the table.
|
## Do a set of initial lookups to quickly populate the table.
|
||||||
# start with a self target query (neighbour nodes)
|
# start with a self target query (neighbour nodes)
|
||||||
let selfQuery = await d.query(d.localNode.id)
|
let selfQuery = await d.query(d.localNode.id)
|
||||||
|
@ -821,7 +831,7 @@ proc populateTable*(d: Protocol) {.async.} =
|
||||||
debug "Total nodes in routing table after populate",
|
debug "Total nodes in routing table after populate",
|
||||||
total = d.routingTable.len()
|
total = d.routingTable.len()
|
||||||
|
|
||||||
proc revalidateNode*(d: Protocol, n: Node) {.async.} =
|
proc revalidateNode*(d: Protocol, n: Node) {.async: (raises: [CancelledError]).} =
|
||||||
let pong = await d.ping(n)
|
let pong = await d.ping(n)
|
||||||
|
|
||||||
if pong.isOk():
|
if pong.isOk():
|
||||||
|
@ -836,7 +846,7 @@ proc revalidateNode*(d: Protocol, n: Node) {.async.} =
|
||||||
let a = Address(ip: res.ip, port: Port(res.port))
|
let a = Address(ip: res.ip, port: Port(res.port))
|
||||||
d.ipVote.insert(n.id, a)
|
d.ipVote.insert(n.id, a)
|
||||||
|
|
||||||
proc revalidateLoop(d: Protocol) {.async.} =
|
proc revalidateLoop(d: Protocol) {.async: (raises: []).} =
|
||||||
## Loop which revalidates the nodes in the routing table by sending the ping
|
## Loop which revalidates the nodes in the routing table by sending the ping
|
||||||
## message.
|
## message.
|
||||||
try:
|
try:
|
||||||
|
@ -848,7 +858,7 @@ proc revalidateLoop(d: Protocol) {.async.} =
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
trace "revalidateLoop canceled"
|
trace "revalidateLoop canceled"
|
||||||
|
|
||||||
proc refreshLoop(d: Protocol) {.async.} =
|
proc refreshLoop(d: Protocol) {.async: (raises: []).} =
|
||||||
## Loop that refreshes the routing table by starting a random query in case
|
## Loop that refreshes the routing table by starting a random query in case
|
||||||
## no queries were done since `refreshInterval` or more.
|
## no queries were done since `refreshInterval` or more.
|
||||||
## It also refreshes the majority address voted for via pong responses.
|
## It also refreshes the majority address voted for via pong responses.
|
||||||
|
@ -882,7 +892,7 @@ proc updateExternalIp*(d: Protocol, extIp: IpAddress, udpPort: Port): bool =
|
||||||
previous, newExtIp = extIp, newUdpPort = udpPort, uri = toURI(d.localNode.record)
|
previous, newExtIp = extIp, newUdpPort = udpPort, uri = toURI(d.localNode.record)
|
||||||
return success
|
return success
|
||||||
|
|
||||||
proc ipMajorityLoop(d: Protocol) {.async.} =
|
proc ipMajorityLoop(d: Protocol) {.async: (raises: []).} =
|
||||||
## When `enrAutoUpdate` is enabled, the IP:port combination returned
|
## When `enrAutoUpdate` is enabled, the IP:port combination returned
|
||||||
## by the majority will be used to update the local ENR.
|
## by the majority will be used to update the local ENR.
|
||||||
## This should be safe as long as the routing table is not overwhelmed by
|
## This should be safe as long as the routing table is not overwhelmed by
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright (c) 2020-2023 Status Research & Development GmbH
|
# Copyright (c) 2020-2024 Status Research & Development GmbH
|
||||||
# Licensed under either of
|
# Licensed under either of
|
||||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
@ -142,21 +142,30 @@ proc completeCmdArg*(T: type Node, val: string): seq[string] =
|
||||||
|
|
||||||
proc parseCmdArg*(T: type PrivateKey, p: string): T {.raises: [ValueError].} =
|
proc parseCmdArg*(T: type PrivateKey, p: string): T {.raises: [ValueError].} =
|
||||||
try:
|
try:
|
||||||
result = PrivateKey.fromHex(string(p)).tryGet()
|
result = PrivateKey.fromHex(p).tryGet()
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
raise newException(ValueError, "Invalid private key")
|
raise newException(ValueError, "Invalid private key")
|
||||||
|
|
||||||
proc completeCmdArg*(T: type PrivateKey, val: string): seq[string] =
|
proc completeCmdArg*(T: type PrivateKey, val: string): seq[string] =
|
||||||
return @[]
|
return @[]
|
||||||
|
|
||||||
proc discover(d: discv5_protocol.Protocol, psFile: string) {.async.} =
|
proc discover(d: discv5_protocol.Protocol, psFile: string) {.async: (raises: [CancelledError]).} =
|
||||||
info "Starting peer-discovery in Ethereum - persisting peers at: ", psFile
|
info "Starting peer-discovery in Ethereum - persisting peers at: ", psFile
|
||||||
|
|
||||||
var ethNodes: HashSet[seq[byte]]
|
var ethNodes: HashSet[seq[byte]]
|
||||||
|
|
||||||
let ps = open(psFile, fmWrite)
|
let ps =
|
||||||
|
try:
|
||||||
|
open(psFile, fmWrite)
|
||||||
|
except IOError as e:
|
||||||
|
fatal "Failed to open file for writing", file = psFile, error = e.msg
|
||||||
|
quit QuitFailure
|
||||||
defer: ps.close()
|
defer: ps.close()
|
||||||
ps.write("pubkey,node_id,fork_digest,ip:port,attnets,attnets_number\n")
|
try:
|
||||||
|
ps.writeLine("pubkey,node_id,fork_digest,ip:port,attnets,attnets_number")
|
||||||
|
except IOError as e:
|
||||||
|
fatal "Failed to write to file", file = psFile, error = e.msg
|
||||||
|
quit QuitFailure
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
let iTime = now(chronos.Moment)
|
let iTime = now(chronos.Moment)
|
||||||
|
@ -179,10 +188,17 @@ proc discover(d: discv5_protocol.Protocol, psFile: string) {.async.} =
|
||||||
for byt in attnets.get():
|
for byt in attnets.get():
|
||||||
bits.inc(countOnes(byt.uint))
|
bits.inc(countOnes(byt.uint))
|
||||||
|
|
||||||
let str = "$#,$#,$#,$#,$#,$#\n"
|
let str = "$#,$#,$#,$#,$#,$#"
|
||||||
let newLine = str % [pubkey.get().toHex, dNode.id.toHex, forkDigest[0..3].toHex, $dNode.address.get(), attnets.get().toHex, $bits]
|
let newLine =
|
||||||
|
try:
|
||||||
ps.write(newLine)
|
str % [pubkey.get().toHex, dNode.id.toHex, forkDigest[0..3].toHex, $dNode.address.get(), attnets.get().toHex, $bits]
|
||||||
|
except ValueError as e:
|
||||||
|
raiseAssert e.msg
|
||||||
|
try:
|
||||||
|
ps.writeLine(newLine)
|
||||||
|
except IOError as e:
|
||||||
|
fatal "Failed to write to file", file = psFile, error = e.msg
|
||||||
|
quit QuitFailure
|
||||||
await sleepAsync(1.seconds) # 1 sec of delay
|
await sleepAsync(1.seconds) # 1 sec of delay
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue