mirror of https://github.com/status-im/nim-eth.git
Add top level push raises Defect to p2p code (#374)
This commit is contained in:
parent
2557fd35c6
commit
a8d11dd30b
11
eth/p2p.nim
11
eth/p2p.nim
|
@ -5,6 +5,8 @@
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, algorithm, random],
|
std/[tables, algorithm, random],
|
||||||
bearssl, chronos, chronos/timer, chronicles,
|
bearssl, chronos, chronos/timer, chronicles,
|
||||||
|
@ -14,8 +16,7 @@ import
|
||||||
export
|
export
|
||||||
p2p_types, rlpx, enode, kademlia
|
p2p_types, rlpx, enode, kademlia
|
||||||
|
|
||||||
proc addCapability*(node: var EthereumNode, p: ProtocolInfo)
|
proc addCapability*(node: var EthereumNode, p: ProtocolInfo) =
|
||||||
{.raises: [Defect].} =
|
|
||||||
doAssert node.connectionState == ConnectionState.None
|
doAssert node.connectionState == ConnectionState.None
|
||||||
|
|
||||||
let pos = lowerBound(node.protocols, p, rlpx.cmp)
|
let pos = lowerBound(node.protocols, p, rlpx.cmp)
|
||||||
|
@ -36,7 +37,7 @@ proc newEthereumNode*(keys: KeyPair,
|
||||||
addAllCapabilities = true,
|
addAllCapabilities = true,
|
||||||
useCompression: bool = false,
|
useCompression: bool = false,
|
||||||
minPeers = 10,
|
minPeers = 10,
|
||||||
rng = newRng()): EthereumNode {.raises: [Defect].} =
|
rng = newRng()): EthereumNode =
|
||||||
|
|
||||||
if rng == nil: # newRng could fail
|
if rng == nil: # newRng could fail
|
||||||
raise (ref Defect)(msg: "Cannot initialize RNG")
|
raise (ref Defect)(msg: "Cannot initialize RNG")
|
||||||
|
@ -79,7 +80,7 @@ proc processIncoming(server: StreamServer,
|
||||||
proc listeningAddress*(node: EthereumNode): ENode =
|
proc listeningAddress*(node: EthereumNode): ENode =
|
||||||
node.toENode()
|
node.toENode()
|
||||||
|
|
||||||
proc startListening*(node: EthereumNode) =
|
proc startListening*(node: EthereumNode) {.raises: [CatchableError, Defect].} =
|
||||||
# TODO allow binding to specific IP / IPv6 / etc
|
# TODO allow binding to specific IP / IPv6 / etc
|
||||||
let ta = initTAddress(IPv4_any(), node.address.tcpPort)
|
let ta = initTAddress(IPv4_any(), node.address.tcpPort)
|
||||||
if node.listeningServer == nil:
|
if node.listeningServer == nil:
|
||||||
|
@ -115,7 +116,7 @@ proc connectToNetwork*(node: EthereumNode,
|
||||||
trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len
|
trace "Waiting for more peers", peers = node.peerPool.connectedNodes.len
|
||||||
await sleepAsync(500.milliseconds)
|
await sleepAsync(500.milliseconds)
|
||||||
|
|
||||||
proc stopListening*(node: EthereumNode) =
|
proc stopListening*(node: EthereumNode) {.raises: [CatchableError, Defect].} =
|
||||||
node.listeningServer.stop()
|
node.listeningServer.stop()
|
||||||
|
|
||||||
iterator peers*(node: EthereumNode): Peer =
|
iterator peers*(node: EthereumNode): Peer =
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils],
|
std/[options, sequtils],
|
||||||
stew/shims/macros, chronos, faststreams/outputs
|
stew/shims/macros, chronos, faststreams/outputs
|
||||||
|
@ -1004,7 +1006,11 @@ macro emitForSingleBackend(
|
||||||
peerState.getType, networkState.getType)
|
peerState.getType, networkState.getType)
|
||||||
|
|
||||||
result = p.genCode()
|
result = p.genCode()
|
||||||
result.storeMacroResult true
|
try:
|
||||||
|
result.storeMacroResult true
|
||||||
|
except IOError:
|
||||||
|
# IO error so the generated nim code might not be stored, don't sweat it.
|
||||||
|
discard
|
||||||
|
|
||||||
macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped =
|
macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped =
|
||||||
let name = $(options[0])
|
let name = $(options[0])
|
||||||
|
|
|
@ -8,6 +8,8 @@
|
||||||
# PeerPool attempts to keep connections to at least min_peers
|
# PeerPool attempts to keep connections to at least min_peers
|
||||||
# on the given network.
|
# on the given network.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[os, tables, times, random, sequtils, options],
|
std/[os, tables, times, random, sequtils, options],
|
||||||
chronos, chronicles,
|
chronos, chronicles,
|
||||||
|
@ -100,7 +102,7 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
|
||||||
# self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote)
|
# self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote)
|
||||||
# return None
|
# return None
|
||||||
|
|
||||||
proc lookupRandomNode(p: PeerPool) {.async, raises: [Defect].} =
|
proc lookupRandomNode(p: PeerPool) {.async.} =
|
||||||
discard await p.discovery.lookupRandom()
|
discard await p.discovery.lookupRandom()
|
||||||
p.lastLookupTime = epochTime()
|
p.lastLookupTime = epochTime()
|
||||||
|
|
||||||
|
@ -108,7 +110,7 @@ proc getRandomBootnode(p: PeerPool): Option[Node] =
|
||||||
if p.discovery.bootstrapNodes.len != 0:
|
if p.discovery.bootstrapNodes.len != 0:
|
||||||
result = option(p.discovery.bootstrapNodes.sample())
|
result = option(p.discovery.bootstrapNodes.sample())
|
||||||
|
|
||||||
proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe, raises: [Defect].} =
|
proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe.} =
|
||||||
doAssert(peer.remote notin pool.connectedNodes)
|
doAssert(peer.remote notin pool.connectedNodes)
|
||||||
pool.connectedNodes[peer.remote] = peer
|
pool.connectedNodes[peer.remote] = peer
|
||||||
connected_peers.inc()
|
connected_peers.inc()
|
||||||
|
@ -163,7 +165,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
if p.connectedNodes.len == 0 and (let n = p.getRandomBootnode(); n.isSome):
|
if p.connectedNodes.len == 0 and (let n = p.getRandomBootnode(); n.isSome):
|
||||||
await p.connectToNode(n.get())
|
await p.connectToNode(n.get())
|
||||||
|
|
||||||
proc run(p: PeerPool) {.async, raises: [Defect].} =
|
proc run(p: PeerPool) {.async.} =
|
||||||
trace "Running PeerPool..."
|
trace "Running PeerPool..."
|
||||||
p.running = true
|
p.running = true
|
||||||
while p.running:
|
while p.running:
|
||||||
|
|
|
@ -70,8 +70,7 @@ chronicles.formatIt(Peer): $(it.remote)
|
||||||
|
|
||||||
include p2p_backends_helpers
|
include p2p_backends_helpers
|
||||||
|
|
||||||
proc requestResolver[MsgType](msg: pointer, future: FutureBase)
|
proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} =
|
||||||
{.gcsafe, raises:[Defect].} =
|
|
||||||
var f = Future[Option[MsgType]](future)
|
var f = Future[Option[MsgType]](future)
|
||||||
if not f.finished:
|
if not f.finished:
|
||||||
if msg != nil:
|
if msg != nil:
|
||||||
|
@ -117,7 +116,7 @@ proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
|
||||||
# result = $(cast[ptr MsgType](msg)[])
|
# result = $(cast[ptr MsgType](msg)[])
|
||||||
|
|
||||||
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||||
notifyOtherPeer = false) {.gcsafe, raises: [Defect], async.}
|
notifyOtherPeer = false) {.gcsafe, async.}
|
||||||
|
|
||||||
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
|
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
|
||||||
var e = newException(PeerDisconnected, msg)
|
var e = newException(PeerDisconnected, msg)
|
||||||
|
@ -374,7 +373,7 @@ proc registerRequest(peer: Peer,
|
||||||
|
|
||||||
doAssert(not peer.dispatcher.isNil)
|
doAssert(not peer.dispatcher.isNil)
|
||||||
let requestResolver = peer.dispatcher.messages[responseMsgId].requestResolver
|
let requestResolver = peer.dispatcher.messages[responseMsgId].requestResolver
|
||||||
proc timeoutExpired(udata: pointer) {.gcsafe, raises:[Defect].} =
|
proc timeoutExpired(udata: pointer) {.gcsafe.} =
|
||||||
requestResolver(nil, responseFuture)
|
requestResolver(nil, responseFuture)
|
||||||
|
|
||||||
discard setTimer(timeoutAt, timeoutExpired, nil)
|
discard setTimer(timeoutAt, timeoutExpired, nil)
|
||||||
|
@ -880,7 +879,7 @@ p2pProtocol DevP2P(version = 5, rlpxName = "p2p"):
|
||||||
proc pong(peer: Peer, emptyList: EmptyList) =
|
proc pong(peer: Peer, emptyList: EmptyList) =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc removePeer(network: EthereumNode, peer: Peer) {.raises: [Defect].} =
|
proc removePeer(network: EthereumNode, peer: Peer) =
|
||||||
# It is necessary to check if peer.remote still exists. The connection might
|
# It is necessary to check if peer.remote still exists. The connection might
|
||||||
# have been dropped already from the peers side.
|
# have been dropped already from the peers side.
|
||||||
# E.g. when receiving a p2p.disconnect message from a peer, a race will happen
|
# E.g. when receiving a p2p.disconnect message from a peer, a race will happen
|
||||||
|
@ -914,7 +913,7 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason):
|
||||||
trace "Disconnection handler ended with an error", err = f.error.msg
|
trace "Disconnection handler ended with an error", err = f.error.msg
|
||||||
|
|
||||||
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||||
notifyOtherPeer = false) {.async, raises: [Defect].} =
|
notifyOtherPeer = false) {.async.} =
|
||||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
peer.connectionState = Disconnecting
|
peer.connectionState = Disconnecting
|
||||||
# Do this first so sub-protocols have time to clean up and stop sending
|
# Do this first so sub-protocols have time to clean up and stop sending
|
||||||
|
|
|
@ -5,6 +5,8 @@
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[algorithm, bitops, math, options, tables, times, hashes],
|
std/[algorithm, bitops, math, options, tables, times, hashes],
|
||||||
chronicles, stew/[byteutils, endians2], metrics, bearssl,
|
chronicles, stew/[byteutils, endians2], metrics, bearssl,
|
||||||
|
@ -519,7 +521,7 @@ proc initQueue*(capacity: int): Queue =
|
||||||
result.capacity = capacity
|
result.capacity = capacity
|
||||||
result.itemHashes.init()
|
result.itemHashes.init()
|
||||||
|
|
||||||
proc prune*(self: var Queue) {.raises: [].} =
|
proc prune*(self: var Queue) =
|
||||||
## Remove items that are past their expiry time
|
## Remove items that are past their expiry time
|
||||||
let now = epochTime().uint32
|
let now = epochTime().uint32
|
||||||
|
|
||||||
|
@ -654,7 +656,8 @@ proc notify*(filters: var Filters, msg: Message) {.gcsafe.} =
|
||||||
else:
|
else:
|
||||||
filter.handler(receivedMsg)
|
filter.handler(receivedMsg)
|
||||||
|
|
||||||
proc getFilterMessages*(filters: var Filters, filterId: string): seq[ReceivedMessage] =
|
proc getFilterMessages*(filters: var Filters, filterId: string):
|
||||||
|
seq[ReceivedMessage] {.raises: [KeyError, Defect].} =
|
||||||
result = @[]
|
result = @[]
|
||||||
if filters.contains(filterId):
|
if filters.contains(filterId):
|
||||||
if filters[filterId].handler.isNil():
|
if filters[filterId].handler.isNil():
|
||||||
|
|
|
@ -29,6 +29,8 @@
|
||||||
## However, they only make real sense after ``connectToNetwork`` was started. As
|
## However, they only make real sense after ``connectToNetwork`` was started. As
|
||||||
## else there will be no peers to send and receive messages from.
|
## else there will be no peers to send and receive messages from.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, tables, times],
|
std/[options, tables, times],
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
|
@ -247,7 +249,7 @@ p2pProtocol Whisper(version = whisperVersion,
|
||||||
|
|
||||||
# 'Runner' calls ---------------------------------------------------------------
|
# 'Runner' calls ---------------------------------------------------------------
|
||||||
|
|
||||||
proc processQueue(peer: Peer) {.raises: [Defect].} =
|
proc processQueue(peer: Peer) =
|
||||||
# Send to peer all valid and previously not send envelopes in the queue.
|
# Send to peer all valid and previously not send envelopes in the queue.
|
||||||
var
|
var
|
||||||
envelopes: seq[Envelope] = @[]
|
envelopes: seq[Envelope] = @[]
|
||||||
|
@ -278,12 +280,12 @@ proc processQueue(peer: Peer) {.raises: [Defect].} =
|
||||||
# gets dropped
|
# gets dropped
|
||||||
traceAsyncErrors peer.messages(envelopes)
|
traceAsyncErrors peer.messages(envelopes)
|
||||||
|
|
||||||
proc run(peer: Peer) {.async, raises: [Defect].} =
|
proc run(peer: Peer) {.async.} =
|
||||||
while peer.connectionState notin {Disconnecting, Disconnected}:
|
while peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
peer.processQueue()
|
peer.processQueue()
|
||||||
await sleepAsync(messageInterval)
|
await sleepAsync(messageInterval)
|
||||||
|
|
||||||
proc pruneReceived(node: EthereumNode) {.raises: [Defect].} =
|
proc pruneReceived(node: EthereumNode) =
|
||||||
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ...
|
||||||
var whisperNet = node.protocolState(Whisper)
|
var whisperNet = node.protocolState(Whisper)
|
||||||
|
|
||||||
|
@ -296,7 +298,7 @@ proc pruneReceived(node: EthereumNode) {.raises: [Defect].} =
|
||||||
# the received sets.
|
# the received sets.
|
||||||
peer.received = intersection(peer.received, whisperNet.queue.itemHashes)
|
peer.received = intersection(peer.received, whisperNet.queue.itemHashes)
|
||||||
|
|
||||||
proc run(node: EthereumNode, network: WhisperNetwork) {.async, raises: [Defect].} =
|
proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
|
||||||
while true:
|
while true:
|
||||||
# prune message queue every second
|
# prune message queue every second
|
||||||
# TTL unit is in seconds, so this should be sufficient?
|
# TTL unit is in seconds, so this should be sufficient?
|
||||||
|
@ -401,7 +403,8 @@ proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
|
||||||
var filter: Filter
|
var filter: Filter
|
||||||
return node.protocolState(Whisper).filters.take(filterId, filter)
|
return node.protocolState(Whisper).filters.take(filterId, filter)
|
||||||
|
|
||||||
proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] =
|
proc getFilterMessages*(node: EthereumNode, filterId: string):
|
||||||
|
seq[ReceivedMessage] {.raises: [KeyError, Defect].} =
|
||||||
## Get all the messages currently in the filter queue. This will reset the
|
## Get all the messages currently in the filter queue. This will reset the
|
||||||
## filter message queue.
|
## filter message queue.
|
||||||
return node.protocolState(Whisper).filters.getFilterMessages(filterId)
|
return node.protocolState(Whisper).filters.getFilterMessages(filterId)
|
||||||
|
|
Loading…
Reference in New Issue