mirror of https://github.com/status-im/nim-eth.git
Merge pull request #348 from status-im/p2p-chronos-strict
Adjust for chronosStrictException usage in rest of eth/p2p
This commit is contained in:
commit
2d75e73bab
13
eth.nimble
13
eth.nimble
|
@ -44,22 +44,11 @@ task test_discv5, "Run discovery v5 tests":
|
||||||
runTest("tests/p2p/all_discv5_tests")
|
runTest("tests/p2p/all_discv5_tests")
|
||||||
|
|
||||||
task test_discv4, "Run discovery v4 tests":
|
task test_discv4, "Run discovery v4 tests":
|
||||||
runTest("tests/p2p/test_discovery", chronosStrict = false)
|
runTest("tests/p2p/test_discovery")
|
||||||
|
|
||||||
task test_p2p, "Run p2p tests":
|
task test_p2p, "Run p2p tests":
|
||||||
runTest("tests/p2p/all_tests")
|
runTest("tests/p2p/all_tests")
|
||||||
|
|
||||||
# Code that still requires chronosStrict = false
|
|
||||||
for filename in [
|
|
||||||
"les/test_flow_control",
|
|
||||||
"test_rlpx_thunk",
|
|
||||||
"test_shh",
|
|
||||||
"test_shh_config",
|
|
||||||
"test_shh_connect",
|
|
||||||
"test_protocol_handlers"
|
|
||||||
]:
|
|
||||||
runTest("tests/p2p/" & filename, chronosStrict = false)
|
|
||||||
|
|
||||||
task test_rlp, "Run rlp tests":
|
task test_rlp, "Run rlp tests":
|
||||||
# workaround for github action CI
|
# workaround for github action CI
|
||||||
# mysterious crash on windows-2019 64bit mode
|
# mysterious crash on windows-2019 64bit mode
|
||||||
|
|
|
@ -368,10 +368,12 @@ template deref*(b: Blob): auto = b
|
||||||
template deref*(o: Option): auto = o.get
|
template deref*(o: Option): auto = o.get
|
||||||
template deref*(r: EthResourceRefs): auto = r[]
|
template deref*(r: EthResourceRefs): auto = r[]
|
||||||
|
|
||||||
method genesisHash*(db: AbstractChainDB): KeccakHash {.base, gcsafe.} =
|
method genesisHash*(db: AbstractChainDB): KeccakHash
|
||||||
|
{.base, gcsafe, raises: [Defect].} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
method getBlockHeader*(db: AbstractChainDB, b: HashOrNum, output: var BlockHeader): bool {.base, gcsafe.} =
|
method getBlockHeader*(db: AbstractChainDB, b: HashOrNum,
|
||||||
|
output: var BlockHeader): bool {.base, gcsafe, raises: [CatchableError, Defect].} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
proc getBlockHeader*(db: AbstractChainDB, hash: KeccakHash): BlockHeaderRef {.gcsafe.} =
|
proc getBlockHeader*(db: AbstractChainDB, hash: KeccakHash): BlockHeaderRef {.gcsafe.} =
|
||||||
|
@ -384,22 +386,29 @@ proc getBlockHeader*(db: AbstractChainDB, b: BlockNumber): BlockHeaderRef {.gcsa
|
||||||
if not db.getBlockHeader(HashOrNum(isHash: false, number: b), result[]):
|
if not db.getBlockHeader(HashOrNum(isHash: false, number: b), result[]):
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
method getBestBlockHeader*(self: AbstractChainDB): BlockHeader {.base, gcsafe.} =
|
method getBestBlockHeader*(self: AbstractChainDB): BlockHeader
|
||||||
|
{.base, gcsafe, raises: [CatchableError, Defect].} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
method getSuccessorHeader*(db: AbstractChainDB, h: BlockHeader, output: var BlockHeader, skip = 0'u): bool {.base, gcsafe.} =
|
method getSuccessorHeader*(db: AbstractChainDB, h: BlockHeader,
|
||||||
|
output: var BlockHeader, skip = 0'u): bool
|
||||||
|
{.base, gcsafe, raises: [CatchableError, Defect].} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
method getAncestorHeader*(db: AbstractChainDB, h: BlockHeader, output: var BlockHeader, skip = 0'u): bool {.base, gcsafe.} =
|
method getAncestorHeader*(db: AbstractChainDB, h: BlockHeader,
|
||||||
|
output: var BlockHeader, skip = 0'u): bool
|
||||||
|
{.base, gcsafe, raises: [CatchableError, Defect].} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
method getBlockBody*(db: AbstractChainDB, blockHash: KeccakHash): BlockBodyRef {.base, gcsafe.} =
|
method getBlockBody*(db: AbstractChainDB, blockHash: KeccakHash): BlockBodyRef
|
||||||
|
{.base, gcsafe, raises: [Defect].} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
method getReceipt*(db: AbstractChainDB, hash: KeccakHash): ReceiptRef {.base, gcsafe.} =
|
method getReceipt*(db: AbstractChainDB, hash: KeccakHash): ReceiptRef {.base, gcsafe.} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
method getTrieDB*(db: AbstractChainDB): TrieDatabaseRef {.base, gcsafe.} =
|
method getTrieDB*(db: AbstractChainDB): TrieDatabaseRef
|
||||||
|
{.base, gcsafe, raises: [Defect].} =
|
||||||
notImplemented()
|
notImplemented()
|
||||||
|
|
||||||
method getCodeByHash*(db: AbstractChainDB, hash: KeccakHash): Blob {.base, gcsafe.} =
|
method getCodeByHash*(db: AbstractChainDB, hash: KeccakHash): Blob {.base, gcsafe.} =
|
||||||
|
|
|
@ -18,6 +18,7 @@ proc getContractCode*(chain: AbstractChainDB, req: ContractCodeRequest): Blob {.
|
||||||
let acc = getAccount(chain.getTrieDB, b.stateRoot, req.key)
|
let acc = getAccount(chain.getTrieDB, b.stateRoot, req.key)
|
||||||
result = chain.getCodeByHash(acc.codeHash)
|
result = chain.getCodeByHash(acc.codeHash)
|
||||||
|
|
||||||
proc getStorageNode*(chain: AbstractChainDB, hash: KeccakHash): Blob =
|
proc getStorageNode*(chain: AbstractChainDB, hash: KeccakHash): Blob
|
||||||
|
{.raises: [CatchableError, Defect].} =
|
||||||
let db = chain.getTrieDB
|
let db = chain.getTrieDB
|
||||||
return db.get(hash.data)
|
return db.get(hash.data)
|
||||||
|
|
22
eth/p2p.nim
22
eth/p2p.nim
|
@ -1,12 +1,9 @@
|
||||||
#
|
# nim-eth
|
||||||
# Ethereum P2P
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
# (c) Copyright 2018
|
# Licensed and distributed under either of
|
||||||
# Status Research & Development GmbH
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
#
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
# Licensed under either of
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
||||||
# MIT license (LICENSE-MIT)
|
|
||||||
#
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, algorithm, random],
|
std/[tables, algorithm, random],
|
||||||
|
@ -17,7 +14,8 @@ import
|
||||||
export
|
export
|
||||||
p2p_types, rlpx, enode, kademlia
|
p2p_types, rlpx, enode, kademlia
|
||||||
|
|
||||||
proc addCapability*(node: var EthereumNode, p: ProtocolInfo) =
|
proc addCapability*(node: var EthereumNode, p: ProtocolInfo)
|
||||||
|
{.raises: [Defect].} =
|
||||||
doAssert node.connectionState == ConnectionState.None
|
doAssert node.connectionState == ConnectionState.None
|
||||||
|
|
||||||
let pos = lowerBound(node.protocols, p, rlpx.cmp)
|
let pos = lowerBound(node.protocols, p, rlpx.cmp)
|
||||||
|
@ -38,10 +36,10 @@ proc newEthereumNode*(keys: KeyPair,
|
||||||
addAllCapabilities = true,
|
addAllCapabilities = true,
|
||||||
useCompression: bool = false,
|
useCompression: bool = false,
|
||||||
minPeers = 10,
|
minPeers = 10,
|
||||||
rng = newRng()): EthereumNode =
|
rng = newRng()): EthereumNode {.raises: [Defect].} =
|
||||||
|
|
||||||
if rng == nil: # newRng could fail
|
if rng == nil: # newRng could fail
|
||||||
raise (ref CatchableError)(msg: "Cannot initialize RNG")
|
raise (ref Defect)(msg: "Cannot initialize RNG")
|
||||||
|
|
||||||
new result
|
new result
|
||||||
result.keys = keys
|
result.keys = keys
|
||||||
|
|
|
@ -1,3 +1,10 @@
|
||||||
|
# nim-eth
|
||||||
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sets, options, random, hashes],
|
std/[sets, options, random, hashes],
|
||||||
chronos, chronicles,
|
chronos, chronicles,
|
||||||
|
@ -36,7 +43,7 @@ type
|
||||||
trustedPeers: HashSet[Peer]
|
trustedPeers: HashSet[Peer]
|
||||||
hasOutOfOrderBlocks: bool
|
hasOutOfOrderBlocks: bool
|
||||||
|
|
||||||
proc hash*(p: Peer): Hash {.inline.} = hash(cast[pointer](p))
|
proc hash*(p: Peer): Hash = hash(cast[pointer](p))
|
||||||
|
|
||||||
proc endIndex(b: WantedBlocks): BlockNumber =
|
proc endIndex(b: WantedBlocks): BlockNumber =
|
||||||
result = b.startIndex
|
result = b.startIndex
|
||||||
|
|
|
@ -1,10 +1,17 @@
|
||||||
|
# nim-eth
|
||||||
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
eth/common/[eth_types, state_accessors]
|
eth/common/[eth_types, state_accessors]
|
||||||
|
|
||||||
# TODO: Perhaps we can move this to eth-common
|
# TODO: Perhaps we can move this to eth-common
|
||||||
|
|
||||||
proc getBlockHeaders*(db: AbstractChainDB,
|
proc getBlockHeaders*(db: AbstractChainDB, req: BlocksRequest): seq[BlockHeader]
|
||||||
req: BlocksRequest): seq[BlockHeader] {.gcsafe.} =
|
{.gcsafe, raises: [CatchableError, Defect].} =
|
||||||
result = newSeqOfCap[BlockHeader](req.maxResults)
|
result = newSeqOfCap[BlockHeader](req.maxResults)
|
||||||
|
|
||||||
var foundBlock: BlockHeader
|
var foundBlock: BlockHeader
|
||||||
|
|
|
@ -379,7 +379,7 @@ proc decodeMessage*(body: openarray[byte]): DecodeResult[Message] =
|
||||||
return err("Invalid request-id")
|
return err("Invalid request-id")
|
||||||
|
|
||||||
proc decode[T](rlp: var Rlp, v: var T)
|
proc decode[T](rlp: var Rlp, v: var T)
|
||||||
{.inline, nimcall, raises:[RlpError, ValueError, Defect].} =
|
{.nimcall, raises:[RlpError, ValueError, Defect].} =
|
||||||
for k, v in v.fieldPairs:
|
for k, v in v.fieldPairs:
|
||||||
v = rlp.read(typeof(v))
|
v = rlp.read(typeof(v))
|
||||||
|
|
||||||
|
|
|
@ -468,7 +468,7 @@ proc `$`*(r: Record): string =
|
||||||
proc `==`*(a, b: Record): bool = a.raw == b.raw
|
proc `==`*(a, b: Record): bool = a.raw == b.raw
|
||||||
|
|
||||||
proc read*(rlp: var Rlp, T: typedesc[Record]):
|
proc read*(rlp: var Rlp, T: typedesc[Record]):
|
||||||
T {.inline, raises:[RlpError, ValueError, Defect].} =
|
T {.raises: [RlpError, ValueError, Defect].} =
|
||||||
if not rlp.hasData() or not result.fromBytes(rlp.rawData):
|
if not rlp.hasData() or not result.fromBytes(rlp.rawData):
|
||||||
# TODO: This could also just be an invalid signature, would be cleaner to
|
# TODO: This could also just be an invalid signature, would be cleaner to
|
||||||
# split of RLP deserialisation errors from this.
|
# split of RLP deserialisation errors from this.
|
||||||
|
|
|
@ -193,7 +193,7 @@ proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE): seq[Node] =
|
||||||
## Return up to k neighbours (closest node ids) of the given node id.
|
## Return up to k neighbours (closest node ids) of the given node id.
|
||||||
d.routingTable.neighbours(id, k)
|
d.routingTable.neighbours(id, k)
|
||||||
|
|
||||||
proc nodesDiscovered*(d: Protocol): int {.inline.} = d.routingTable.len
|
proc nodesDiscovered*(d: Protocol): int = d.routingTable.len
|
||||||
|
|
||||||
func privKey*(d: Protocol): lent PrivateKey =
|
func privKey*(d: Protocol): lent PrivateKey =
|
||||||
d.privateKey
|
d.privateKey
|
||||||
|
|
|
@ -132,8 +132,8 @@ proc distanceTo(k: KBucket, id: NodeId): UInt256 = k.midpoint xor id
|
||||||
proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] =
|
proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] =
|
||||||
sortedByIt(k.nodes, it.distanceTo(id))
|
sortedByIt(k.nodes, it.distanceTo(id))
|
||||||
|
|
||||||
proc len(k: KBucket): int {.inline.} = k.nodes.len
|
proc len(k: KBucket): int = k.nodes.len
|
||||||
proc tail(k: KBucket): Node {.inline.} = k.nodes[high(k.nodes)]
|
proc tail(k: KBucket): Node = k.nodes[high(k.nodes)]
|
||||||
|
|
||||||
proc ipLimitInc(r: var RoutingTable, b: KBucket, n: Node): bool =
|
proc ipLimitInc(r: var RoutingTable, b: KBucket, n: Node): bool =
|
||||||
## Check if the ip limits of the routing table and the bucket are reached for
|
## Check if the ip limits of the routing table and the bucket are reached for
|
||||||
|
@ -194,7 +194,7 @@ proc split(k: KBucket): tuple[lower, upper: KBucket] =
|
||||||
doAssert(bucket.ipLimits.inc(node.address.get().ip),
|
doAssert(bucket.ipLimits.inc(node.address.get().ip),
|
||||||
"IpLimit increment should work as all buckets have the same limits")
|
"IpLimit increment should work as all buckets have the same limits")
|
||||||
|
|
||||||
proc inRange(k: KBucket, n: Node): bool {.inline.} =
|
proc inRange(k: KBucket, n: Node): bool =
|
||||||
k.istart <= n.id and n.id <= k.iend
|
k.istart <= n.id and n.id <= k.iend
|
||||||
|
|
||||||
proc contains(k: KBucket, n: Node): bool = n in k.nodes
|
proc contains(k: KBucket, n: Node): bool = n in k.nodes
|
||||||
|
@ -234,7 +234,7 @@ proc computeSharedPrefixBits(nodes: openarray[NodeId]): int =
|
||||||
doAssert(false, "Unable to calculate number of shared prefix bits")
|
doAssert(false, "Unable to calculate number of shared prefix bits")
|
||||||
|
|
||||||
proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = DefaultBitsPerHop,
|
proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = DefaultBitsPerHop,
|
||||||
ipLimits = DefaultTableIpLimits, rng: ref BrHmacDrbgContext) {.inline.} =
|
ipLimits = DefaultTableIpLimits, rng: ref BrHmacDrbgContext) =
|
||||||
## Initialize the routing table for provided `Node` and bitsPerHop value.
|
## Initialize the routing table for provided `Node` and bitsPerHop value.
|
||||||
## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper.
|
## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper.
|
||||||
r.thisNode = thisNode
|
r.thisNode = thisNode
|
||||||
|
|
|
@ -28,9 +28,12 @@ template networkState*(connection: Peer, Protocol: type): untyped =
|
||||||
## particular connection.
|
## particular connection.
|
||||||
protocolState(connection.network, Protocol)
|
protocolState(connection.network, Protocol)
|
||||||
|
|
||||||
proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard
|
proc initProtocolState*[T](state: T, x: Peer|EthereumNode)
|
||||||
|
{.gcsafe, raises: [Defect].} =
|
||||||
|
discard
|
||||||
|
|
||||||
proc initProtocolStates(peer: Peer, protocols: openarray[ProtocolInfo]) =
|
proc initProtocolStates(peer: Peer, protocols: openarray[ProtocolInfo])
|
||||||
|
{.raises: [Defect].} =
|
||||||
# Initialize all the active protocol states
|
# Initialize all the active protocol states
|
||||||
newSeq(peer.protocolStates, allProtocols.len)
|
newSeq(peer.protocolStates, allProtocols.len)
|
||||||
for protocol in protocols:
|
for protocol in protocols:
|
||||||
|
|
|
@ -1,3 +1,10 @@
|
||||||
|
# nim-eth
|
||||||
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
# PeerPool attempts to keep connections to at least min_peers
|
# PeerPool attempts to keep connections to at least min_peers
|
||||||
# on the given network.
|
# on the given network.
|
||||||
|
|
||||||
|
@ -26,9 +33,7 @@ proc newPeerPool*(network: EthereumNode,
|
||||||
result.observers = initTable[int, PeerObserver]()
|
result.observers = initTable[int, PeerObserver]()
|
||||||
result.listenPort = listenPort
|
result.listenPort = listenPort
|
||||||
|
|
||||||
template ensureFuture(f: untyped) = asyncCheck f
|
proc nodesToConnect(p: PeerPool): seq[Node] =
|
||||||
|
|
||||||
proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} =
|
|
||||||
p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes)
|
p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes)
|
||||||
|
|
||||||
proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
|
proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
|
||||||
|
@ -42,10 +47,10 @@ proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
|
||||||
proc delObserver(p: PeerPool, observerId: int) =
|
proc delObserver(p: PeerPool, observerId: int) =
|
||||||
p.observers.del(observerId)
|
p.observers.del(observerId)
|
||||||
|
|
||||||
proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) {.inline.} =
|
proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) =
|
||||||
p.addObserver(cast[int](observerId), observer)
|
p.addObserver(cast[int](observerId), observer)
|
||||||
|
|
||||||
proc delObserver*(p: PeerPool, observerId: ref) {.inline.} =
|
proc delObserver*(p: PeerPool, observerId: ref) =
|
||||||
p.delObserver(cast[int](observerId))
|
p.delObserver(cast[int](observerId))
|
||||||
|
|
||||||
template setProtocol*(observer: PeerObserver, Protocol: type) =
|
template setProtocol*(observer: PeerObserver, Protocol: type) =
|
||||||
|
@ -95,20 +100,15 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
|
||||||
# self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote)
|
# self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote)
|
||||||
# return None
|
# return None
|
||||||
|
|
||||||
proc lookupRandomNode(p: PeerPool) {.async.} =
|
proc lookupRandomNode(p: PeerPool) {.async, raises: [Defect].} =
|
||||||
# This method runs in the background, so we must catch OperationCancelled
|
discard await p.discovery.lookupRandom()
|
||||||
# ere otherwise asyncio will warn that its exception was never retrieved.
|
|
||||||
try:
|
|
||||||
discard await p.discovery.lookupRandom()
|
|
||||||
except: # OperationCancelled
|
|
||||||
discard
|
|
||||||
p.lastLookupTime = epochTime()
|
p.lastLookupTime = epochTime()
|
||||||
|
|
||||||
proc getRandomBootnode(p: PeerPool): Option[Node] =
|
proc getRandomBootnode(p: PeerPool): Option[Node] =
|
||||||
if p.discovery.bootstrapNodes.len != 0:
|
if p.discovery.bootstrapNodes.len != 0:
|
||||||
result = option(p.discovery.bootstrapNodes.sample())
|
result = option(p.discovery.bootstrapNodes.sample())
|
||||||
|
|
||||||
proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe.} =
|
proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe, raises: [Defect].} =
|
||||||
doAssert(peer.remote notin pool.connectedNodes)
|
doAssert(peer.remote notin pool.connectedNodes)
|
||||||
pool.connectedNodes[peer.remote] = peer
|
pool.connectedNodes[peer.remote] = peer
|
||||||
connected_peers.inc()
|
connected_peers.inc()
|
||||||
|
@ -149,7 +149,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
return
|
return
|
||||||
|
|
||||||
if p.lastLookupTime + lookupInterval < epochTime():
|
if p.lastLookupTime + lookupInterval < epochTime():
|
||||||
ensureFuture p.lookupRandomNode()
|
asyncSpawn p.lookupRandomNode()
|
||||||
|
|
||||||
let debugEnode = getEnv("ETH_DEBUG_ENODE")
|
let debugEnode = getEnv("ETH_DEBUG_ENODE")
|
||||||
if debugEnode.len != 0:
|
if debugEnode.len != 0:
|
||||||
|
@ -163,7 +163,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
if p.connectedNodes.len == 0 and (let n = p.getRandomBootnode(); n.isSome):
|
if p.connectedNodes.len == 0 and (let n = p.getRandomBootnode(); n.isSome):
|
||||||
await p.connectToNode(n.get())
|
await p.connectToNode(n.get())
|
||||||
|
|
||||||
proc run(p: PeerPool) {.async.} =
|
proc run(p: PeerPool) {.async, raises: [Defect].} =
|
||||||
trace "Running PeerPool..."
|
trace "Running PeerPool..."
|
||||||
p.running = true
|
p.running = true
|
||||||
while p.running:
|
while p.running:
|
||||||
|
@ -184,7 +184,7 @@ proc run(p: PeerPool) {.async.} =
|
||||||
|
|
||||||
proc start*(p: PeerPool) =
|
proc start*(p: PeerPool) =
|
||||||
if not p.running:
|
if not p.running:
|
||||||
asyncCheck p.run()
|
asyncSpawn p.run()
|
||||||
|
|
||||||
proc len*(p: PeerPool): int = p.connectedNodes.len
|
proc len*(p: PeerPool): int = p.connectedNodes.len
|
||||||
# @property
|
# @property
|
||||||
|
|
|
@ -1,3 +1,10 @@
|
||||||
|
# nim-eth
|
||||||
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[deques, tables],
|
std/[deques, tables],
|
||||||
bearssl, chronos,
|
bearssl, chronos,
|
||||||
|
@ -61,8 +68,8 @@ type
|
||||||
observers*: Table[int, PeerObserver]
|
observers*: Table[int, PeerObserver]
|
||||||
|
|
||||||
PeerObserver* = object
|
PeerObserver* = object
|
||||||
onPeerConnected*: proc(p: Peer) {.gcsafe.}
|
onPeerConnected*: proc(p: Peer) {.gcsafe, raises: [Defect].}
|
||||||
onPeerDisconnected*: proc(p: Peer) {.gcsafe.}
|
onPeerDisconnected*: proc(p: Peer) {.gcsafe, raises: [Defect].}
|
||||||
protocol*: ProtocolInfo
|
protocol*: ProtocolInfo
|
||||||
|
|
||||||
Capability* = object
|
Capability* = object
|
||||||
|
@ -138,16 +145,20 @@ type
|
||||||
|
|
||||||
# Private types:
|
# Private types:
|
||||||
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode
|
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode
|
||||||
ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void] {.gcsafe.}
|
ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void]
|
||||||
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
|
{.gcsafe, raises: [RlpError, Defect].}
|
||||||
|
MessageContentPrinter* = proc(msg: pointer): string
|
||||||
|
{.gcsafe, raises: [Defect].}
|
||||||
RequestResolver* = proc(msg: pointer, future: FutureBase)
|
RequestResolver* = proc(msg: pointer, future: FutureBase)
|
||||||
{.gcsafe, raises:[Defect].}
|
{.gcsafe, raises: [Defect].}
|
||||||
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase) {.gcsafe.}
|
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase)
|
||||||
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
|
{.gcsafe, raises: [RlpError, Defect].}
|
||||||
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
|
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [Defect].}
|
||||||
HandshakeStep* = proc(peer: Peer): Future[void] {.gcsafe.}
|
NetworkStateInitializer* = proc(network: EthereumNode): RootRef
|
||||||
|
{.gcsafe, raises: [Defect].}
|
||||||
|
HandshakeStep* = proc(peer: Peer): Future[void] {.gcsafe, raises: [Defect].}
|
||||||
DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason):
|
DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason):
|
||||||
Future[void] {.gcsafe.}
|
Future[void] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
ConnectionState* = enum
|
ConnectionState* = enum
|
||||||
None,
|
None,
|
||||||
|
|
|
@ -1,3 +1,10 @@
|
||||||
|
# nim-eth
|
||||||
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, algorithm, deques, hashes, options, typetraits],
|
std/[tables, algorithm, deques, hashes, options, typetraits],
|
||||||
stew/shims/macros, chronicles, nimcrypto, chronos,
|
stew/shims/macros, chronicles, nimcrypto, chronos,
|
||||||
|
@ -108,7 +115,7 @@ proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
|
||||||
# result = $(cast[ptr MsgType](msg)[])
|
# result = $(cast[ptr MsgType](msg)[])
|
||||||
|
|
||||||
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||||
notifyOtherPeer = false) {.gcsafe, async.}
|
notifyOtherPeer = false) {.gcsafe, raises: [Defect], async.}
|
||||||
|
|
||||||
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
|
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
|
||||||
var e = newException(PeerDisconnected, msg)
|
var e = newException(PeerDisconnected, msg)
|
||||||
|
@ -248,17 +255,17 @@ func nameStr*(p: ProtocolInfo): string =
|
||||||
result = newStringOfCap(3)
|
result = newStringOfCap(3)
|
||||||
for c in p.name: result.add(c)
|
for c in p.name: result.add(c)
|
||||||
|
|
||||||
# XXX: this used to be inline, but inline procs
|
|
||||||
# cannot be passed to closure params
|
|
||||||
proc cmp*(lhs, rhs: ProtocolInfo): int =
|
proc cmp*(lhs, rhs: ProtocolInfo): int =
|
||||||
for i in 0..2:
|
for i in 0..2:
|
||||||
if lhs.name[i] != rhs.name[i]:
|
if lhs.name[i] != rhs.name[i]:
|
||||||
return int16(lhs.name[i]) - int16(rhs.name[i])
|
return int16(lhs.name[i]) - int16(rhs.name[i])
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) {.gcsafe.} =
|
proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase)
|
||||||
|
{.gcsafe, raises: [RlpError, Defect].} =
|
||||||
var reader = msgData
|
var reader = msgData
|
||||||
Future[MsgType](future).complete reader.readRecordType(MsgType, MsgType.rlpFieldsCount > 1)
|
Future[MsgType](future).complete reader.readRecordType(MsgType,
|
||||||
|
MsgType.rlpFieldsCount > 1)
|
||||||
|
|
||||||
proc registerMsg(protocol: ProtocolInfo,
|
proc registerMsg(protocol: ProtocolInfo,
|
||||||
id: int, name: string,
|
id: int, name: string,
|
||||||
|
@ -288,7 +295,7 @@ proc registerProtocol(protocol: ProtocolInfo) =
|
||||||
# Message composition and encryption
|
# Message composition and encryption
|
||||||
#
|
#
|
||||||
|
|
||||||
proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int {.inline.} =
|
proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int =
|
||||||
result = msgId
|
result = msgId
|
||||||
if not peer.dispatcher.isNil:
|
if not peer.dispatcher.isNil:
|
||||||
result += peer.dispatcher.protocolOffsets[proto.index]
|
result += peer.dispatcher.protocolOffsets[proto.index]
|
||||||
|
@ -297,17 +304,18 @@ template getPeer(peer: Peer): auto = peer
|
||||||
template getPeer(responder: ResponderWithId): auto = responder.peer
|
template getPeer(responder: ResponderWithId): auto = responder.peer
|
||||||
template getPeer(responder: ResponderWithoutId): auto = Peer(responder)
|
template getPeer(responder: ResponderWithoutId): auto = Peer(responder)
|
||||||
|
|
||||||
proc supports*(peer: Peer, proto: ProtocolInfo): bool {.inline.} =
|
proc supports*(peer: Peer, proto: ProtocolInfo): bool =
|
||||||
peer.dispatcher.protocolOffsets[proto.index] != -1
|
peer.dispatcher.protocolOffsets[proto.index] != -1
|
||||||
|
|
||||||
proc supports*(peer: Peer, Protocol: type): bool {.inline.} =
|
proc supports*(peer: Peer, Protocol: type): bool =
|
||||||
## Checks whether a Peer supports a particular protocol
|
## Checks whether a Peer supports a particular protocol
|
||||||
peer.supports(Protocol.protocolInfo)
|
peer.supports(Protocol.protocolInfo)
|
||||||
|
|
||||||
template perPeerMsgId(peer: Peer, MsgType: type): int =
|
template perPeerMsgId(peer: Peer, MsgType: type): int =
|
||||||
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
|
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
|
||||||
|
|
||||||
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] =
|
proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void]
|
||||||
|
{.raises: [UnsupportedMessageError, RlpError, Defect].} =
|
||||||
template invalidIdError: untyped =
|
template invalidIdError: untyped =
|
||||||
raise newException(UnsupportedMessageError,
|
raise newException(UnsupportedMessageError,
|
||||||
"RLPx message with an invalid id " & $msgId &
|
"RLPx message with an invalid id " & $msgId &
|
||||||
|
@ -367,7 +375,7 @@ proc registerRequest(peer: Peer,
|
||||||
proc timeoutExpired(udata: pointer) {.gcsafe, raises:[Defect].} =
|
proc timeoutExpired(udata: pointer) {.gcsafe, raises:[Defect].} =
|
||||||
requestResolver(nil, responseFuture)
|
requestResolver(nil, responseFuture)
|
||||||
|
|
||||||
addTimer(timeoutAt, timeoutExpired, nil)
|
discard setTimer(timeoutAt, timeoutExpired, nil)
|
||||||
|
|
||||||
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
proc resolveResponseFuture(peer: Peer, msgId: int, msg: pointer, reqId: int) =
|
||||||
logScope:
|
logScope:
|
||||||
|
@ -504,7 +512,7 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} =
|
||||||
await peer.disconnectAndRaise(BreachOfProtocol,
|
await peer.disconnectAndRaise(BreachOfProtocol,
|
||||||
"Cannot read RLPx message id")
|
"Cannot read RLPx message id")
|
||||||
|
|
||||||
proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto {.inline.} =
|
proc checkedRlpRead(peer: Peer, r: var Rlp, MsgType: type): auto =
|
||||||
when defined(release):
|
when defined(release):
|
||||||
return r.read(MsgType)
|
return r.read(MsgType)
|
||||||
else:
|
else:
|
||||||
|
@ -766,7 +774,10 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
|
||||||
thunkName = ident(msgName & "Thunk")
|
thunkName = ident(msgName & "Thunk")
|
||||||
|
|
||||||
msg.defineThunk quote do:
|
msg.defineThunk quote do:
|
||||||
proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp) {.async, gcsafe.} =
|
proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp)
|
||||||
|
# Fun error if you just use `RlpError` instead of `rlp.RlpError`:
|
||||||
|
# "Error: type expected, but got symbol 'RlpError' of kind 'EnumField'"
|
||||||
|
{.async, gcsafe, raises: [rlp.RlpError, Defect].} =
|
||||||
var `receivedRlp` = data
|
var `receivedRlp` = data
|
||||||
var `receivedMsg` {.noinit.}: `msgRecName`
|
var `receivedMsg` {.noinit.}: `msgRecName`
|
||||||
`readParamsPrelude`
|
`readParamsPrelude`
|
||||||
|
@ -866,12 +877,13 @@ p2pProtocol DevP2P(version = 5, rlpxName = "p2p"):
|
||||||
proc pong(peer: Peer, emptyList: EmptyList) =
|
proc pong(peer: Peer, emptyList: EmptyList) =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc removePeer(network: EthereumNode, peer: Peer) =
|
proc removePeer(network: EthereumNode, peer: Peer) {.raises: [Defect].} =
|
||||||
# It is necessary to check if peer.remote still exists. The connection might
|
# It is necessary to check if peer.remote still exists. The connection might
|
||||||
# have been dropped already from the peers side.
|
# have been dropped already from the peers side.
|
||||||
# E.g. when receiving a p2p.disconnect message from a peer, a race will happen
|
# E.g. when receiving a p2p.disconnect message from a peer, a race will happen
|
||||||
# between which side disconnects first.
|
# between which side disconnects first.
|
||||||
if network.peerPool != nil and not peer.remote.isNil and peer.remote in network.peerPool.connectedNodes:
|
if network.peerPool != nil and not peer.remote.isNil and
|
||||||
|
peer.remote in network.peerPool.connectedNodes:
|
||||||
network.peerPool.connectedNodes.del(peer.remote)
|
network.peerPool.connectedNodes.del(peer.remote)
|
||||||
connected_peers.dec()
|
connected_peers.dec()
|
||||||
|
|
||||||
|
@ -883,16 +895,23 @@ proc removePeer(network: EthereumNode, peer: Peer) =
|
||||||
if observer.protocol.isNil or peer.supports(observer.protocol):
|
if observer.protocol.isNil or peer.supports(observer.protocol):
|
||||||
observer.onPeerDisconnected(peer)
|
observer.onPeerDisconnected(peer)
|
||||||
|
|
||||||
proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[void] =
|
proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason):
|
||||||
|
Future[void] {.async.} =
|
||||||
var futures = newSeqOfCap[Future[void]](allProtocols.len)
|
var futures = newSeqOfCap[Future[void]](allProtocols.len)
|
||||||
|
|
||||||
for protocol in peer.dispatcher.activeProtocols:
|
for protocol in peer.dispatcher.activeProtocols:
|
||||||
if protocol.disconnectHandler != nil:
|
if protocol.disconnectHandler != nil:
|
||||||
futures.add((protocol.disconnectHandler)(peer, reason))
|
futures.add((protocol.disconnectHandler)(peer, reason))
|
||||||
|
|
||||||
return all(futures)
|
await allFutures(futures)
|
||||||
|
|
||||||
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
|
for f in futures:
|
||||||
|
doAssert(f.finished())
|
||||||
|
if f.failed():
|
||||||
|
trace "Disconnection handler ended with an error", err = f.error.msg
|
||||||
|
|
||||||
|
proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||||
|
notifyOtherPeer = false) {.async, raises: [Defect].} =
|
||||||
if peer.connectionState notin {Disconnecting, Disconnected}:
|
if peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
peer.connectionState = Disconnecting
|
peer.connectionState = Disconnecting
|
||||||
# Do this first so sub-protocols have time to clean up and stop sending
|
# Do this first so sub-protocols have time to clean up and stop sending
|
||||||
|
@ -901,7 +920,7 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = fals
|
||||||
# In case of `CatchableError` in any of the handlers, this will be logged.
|
# In case of `CatchableError` in any of the handlers, this will be logged.
|
||||||
# Other handlers will still execute.
|
# Other handlers will still execute.
|
||||||
# In case of `Defect` in any of the handlers, program will quit.
|
# In case of `Defect` in any of the handlers, program will quit.
|
||||||
traceAwaitErrors callDisconnectHandlers(peer, reason)
|
await callDisconnectHandlers(peer, reason)
|
||||||
|
|
||||||
if notifyOtherPeer and not peer.transport.closed:
|
if notifyOtherPeer and not peer.transport.closed:
|
||||||
var fut = peer.sendDisconnectMsg(DisconnectionReasonList(value: reason))
|
var fut = peer.sendDisconnectMsg(DisconnectionReasonList(value: reason))
|
||||||
|
@ -926,12 +945,13 @@ proc validatePubKeyInHello(msg: DevP2P.hello, pubKey: PublicKey): bool =
|
||||||
let pk = PublicKey.fromRaw(msg.nodeId)
|
let pk = PublicKey.fromRaw(msg.nodeId)
|
||||||
pk.isOk and pk[] == pubKey
|
pk.isOk and pk[] == pubKey
|
||||||
|
|
||||||
proc checkUselessPeer(peer: Peer) {.inline.} =
|
proc checkUselessPeer(peer: Peer) =
|
||||||
if peer.dispatcher.numProtocols == 0:
|
if peer.dispatcher.numProtocols == 0:
|
||||||
# XXX: Send disconnect + UselessPeer
|
# XXX: Send disconnect + UselessPeer
|
||||||
raise newException(UselessPeerError, "Useless peer")
|
raise newException(UselessPeerError, "Useless peer")
|
||||||
|
|
||||||
proc initPeerState*(peer: Peer, capabilities: openarray[Capability]) =
|
proc initPeerState*(peer: Peer, capabilities: openarray[Capability])
|
||||||
|
{.raises: [UselessPeerError, Defect].} =
|
||||||
peer.dispatcher = getDispatcher(peer.network, capabilities)
|
peer.dispatcher = getDispatcher(peer.network, capabilities)
|
||||||
checkUselessPeer(peer)
|
checkUselessPeer(peer)
|
||||||
|
|
||||||
|
@ -979,7 +999,12 @@ proc postHelloSteps(peer: Peer, h: DevP2P.hello) {.async.} =
|
||||||
# The handshake may involve multiple async steps, so we wait
|
# The handshake may involve multiple async steps, so we wait
|
||||||
# here for all of them to finish.
|
# here for all of them to finish.
|
||||||
#
|
#
|
||||||
await all(subProtocolsHandshakes)
|
await allFutures(subProtocolsHandshakes)
|
||||||
|
|
||||||
|
for handshake in subProtocolsHandshakes:
|
||||||
|
doAssert(handshake.finished())
|
||||||
|
if handshake.failed():
|
||||||
|
raise handshake.error
|
||||||
|
|
||||||
# This is needed as a peer might have already disconnected. In this case
|
# This is needed as a peer might have already disconnected. In this case
|
||||||
# we need to raise so that rlpxConnect/rlpxAccept fails.
|
# we need to raise so that rlpxConnect/rlpxAccept fails.
|
||||||
|
|
|
@ -316,7 +316,7 @@ proc acceptRequest*(network: LesNetwork, peer: LesPeer,
|
||||||
network.updateFlowControl t
|
network.updateFlowControl t
|
||||||
|
|
||||||
while not network.canServeRequest:
|
while not network.canServeRequest:
|
||||||
await sleepAsync(10)
|
await sleepAsync(10.milliseconds)
|
||||||
|
|
||||||
if peer notin network.peers:
|
if peer notin network.peers:
|
||||||
# The peer was disconnected or the network
|
# The peer was disconnected or the network
|
||||||
|
|
|
@ -1,19 +1,19 @@
|
||||||
#
|
# nim-eth - Whisper
|
||||||
# Whisper
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
# (c) Copyright 2018-2019
|
# Licensed and distributed under either of
|
||||||
# Status Research & Development GmbH
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
#
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
# Licensed under either of
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
||||||
# MIT license (LICENSE-MIT)
|
|
||||||
#
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[algorithm, bitops, math, options, tables, times, strutils, hashes],
|
std/[algorithm, bitops, math, options, tables, times, hashes],
|
||||||
chronicles, stew/[byteutils, endians2], metrics, bearssl,
|
chronicles, stew/[byteutils, endians2], metrics, bearssl,
|
||||||
nimcrypto/[bcmode, hash, keccak, rijndael],
|
nimcrypto/[bcmode, hash, keccak, rijndael],
|
||||||
".."/../../[keys, rlp, p2p], ../../ecies
|
".."/../../[keys, rlp, p2p], ../../ecies
|
||||||
|
|
||||||
|
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
||||||
|
import std/strutils
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "whisper_types"
|
topics = "whisper_types"
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,9 @@
|
||||||
#
|
# nim-eth - Whisper
|
||||||
# Whisper
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
# (c) Copyright 2018-2019
|
# Licensed and distributed under either of
|
||||||
# Status Research & Development GmbH
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
#
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
# Licensed under either of
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
||||||
# MIT license (LICENSE-MIT)
|
|
||||||
#
|
|
||||||
|
|
||||||
## Whisper
|
## Whisper
|
||||||
## *******
|
## *******
|
||||||
|
@ -96,8 +93,9 @@ proc allowed*(msg: Message, config: WhisperConfig): bool =
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
proc run(peer: Peer) {.gcsafe, async.}
|
proc run(peer: Peer) {.gcsafe, async, raises: [Defect].}
|
||||||
proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.}
|
proc run(node: EthereumNode, network: WhisperNetwork)
|
||||||
|
{.gcsafe, async, raises: [Defect].}
|
||||||
|
|
||||||
proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} =
|
proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} =
|
||||||
new(network.queue)
|
new(network.queue)
|
||||||
|
@ -107,7 +105,7 @@ proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.}
|
||||||
network.config.powRequirement = defaultMinPow
|
network.config.powRequirement = defaultMinPow
|
||||||
network.config.isLightNode = false
|
network.config.isLightNode = false
|
||||||
network.config.maxMsgSize = defaultMaxMsgSize
|
network.config.maxMsgSize = defaultMaxMsgSize
|
||||||
asyncCheck node.run(network)
|
asyncSpawn node.run(network)
|
||||||
|
|
||||||
p2pProtocol Whisper(version = whisperVersion,
|
p2pProtocol Whisper(version = whisperVersion,
|
||||||
rlpxName = "shh",
|
rlpxName = "shh",
|
||||||
|
@ -152,7 +150,7 @@ p2pProtocol Whisper(version = whisperVersion,
|
||||||
whisperPeer.initialized = true
|
whisperPeer.initialized = true
|
||||||
|
|
||||||
if not whisperNet.config.isLightNode:
|
if not whisperNet.config.isLightNode:
|
||||||
traceAsyncErrors peer.run()
|
asyncSpawn peer.run()
|
||||||
|
|
||||||
debug "Whisper peer initialized", peer
|
debug "Whisper peer initialized", peer
|
||||||
|
|
||||||
|
@ -249,7 +247,7 @@ p2pProtocol Whisper(version = whisperVersion,
|
||||||
|
|
||||||
# 'Runner' calls ---------------------------------------------------------------
|
# 'Runner' calls ---------------------------------------------------------------
|
||||||
|
|
||||||
proc processQueue(peer: Peer) =
|
proc processQueue(peer: Peer) {.raises: [Defect].} =
|
||||||
# Send to peer all valid and previously not send envelopes in the queue.
|
# Send to peer all valid and previously not send envelopes in the queue.
|
||||||
var
|
var
|
||||||
envelopes: seq[Envelope] = @[]
|
envelopes: seq[Envelope] = @[]
|
||||||
|
@ -280,7 +278,7 @@ proc processQueue(peer: Peer) =
|
||||||
# gets dropped
|
# gets dropped
|
||||||
traceAsyncErrors peer.messages(envelopes)
|
traceAsyncErrors peer.messages(envelopes)
|
||||||
|
|
||||||
proc run(peer: Peer) {.async.} =
|
proc run(peer: Peer) {.async, raises: [Defect].} =
|
||||||
while peer.connectionState notin {Disconnecting, Disconnected}:
|
while peer.connectionState notin {Disconnecting, Disconnected}:
|
||||||
peer.processQueue()
|
peer.processQueue()
|
||||||
await sleepAsync(messageInterval)
|
await sleepAsync(messageInterval)
|
||||||
|
@ -298,7 +296,7 @@ proc pruneReceived(node: EthereumNode) {.raises: [].} =
|
||||||
# the received sets.
|
# the received sets.
|
||||||
peer.received = intersection(peer.received, whisperNet.queue.itemHashes)
|
peer.received = intersection(peer.received, whisperNet.queue.itemHashes)
|
||||||
|
|
||||||
proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
|
proc run(node: EthereumNode, network: WhisperNetwork) {.async, raises: [Defect].} =
|
||||||
while true:
|
while true:
|
||||||
# prune message queue every second
|
# prune message queue every second
|
||||||
# TTL unit is in seconds, so this should be sufficient?
|
# TTL unit is in seconds, so this should be sufficient?
|
||||||
|
@ -313,7 +311,13 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} =
|
||||||
proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
|
proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool =
|
||||||
for peer in node.peers(Whisper):
|
for peer in node.peers(Whisper):
|
||||||
if peer.remote.id == peerId:
|
if peer.remote.id == peerId:
|
||||||
asyncCheck peer.p2pMessage(env)
|
let f = peer.p2pMessage(env)
|
||||||
|
# Can't make p2pMessage not raise so this is the "best" option I can think
|
||||||
|
# of instead of using asyncSpawn and still keeping the call not async.
|
||||||
|
f.callback = proc(data: pointer) {.gcsafe, raises: [Defect].} =
|
||||||
|
if f.failed:
|
||||||
|
warn "P2PMessage send failed", msg = f.readError.msg
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
proc queueMessage(node: EthereumNode, msg: Message): bool =
|
proc queueMessage(node: EthereumNode, msg: Message): bool =
|
||||||
|
|
|
@ -4,4 +4,11 @@ import
|
||||||
./test_crypt,
|
./test_crypt,
|
||||||
./test_discovery,
|
./test_discovery,
|
||||||
./test_ecies,
|
./test_ecies,
|
||||||
./test_enode
|
./test_enode,
|
||||||
|
./test_rlpx_thunk,
|
||||||
|
./test_shh,
|
||||||
|
./test_shh_config,
|
||||||
|
./test_shh_connect,
|
||||||
|
./test_protocol_handlers,
|
||||||
|
./les/test_flow_control
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
../../../eth/p2p/rlpx_protocols/les/flow_control
|
../../../eth/p2p/rlpx_protocols/les/flow_control
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import
|
import
|
||||||
std/[unittest, strutils],
|
std/strutils,
|
||||||
chronos, nimcrypto, bearssl,
|
chronos, bearssl,
|
||||||
../../eth/[keys, p2p], ../../eth/p2p/[discovery, enode]
|
../../eth/[keys, p2p], ../../eth/p2p/[discovery, enode]
|
||||||
|
|
||||||
var nextPort = 30303
|
var nextPort = 30303
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
# MIT license (LICENSE-MIT)
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/tables,
|
std/tables,
|
||||||
chronos, testutils/unittests,
|
chronos, testutils/unittests,
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[json, os, unittest],
|
std/[json, os, unittest],
|
||||||
chronos, stew/byteutils,
|
chronos, stew/byteutils,
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
# MIT license (LICENSE-MIT)
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, options, unittest, tables],
|
std/[sequtils, options, unittest, tables],
|
||||||
nimcrypto/hash,
|
nimcrypto/hash,
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
# MIT license (LICENSE-MIT)
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, options, unittest, times],
|
std/[sequtils, options, unittest, times],
|
||||||
../../eth/p2p/rlpx_protocols/whisper_protocol as whisper
|
../../eth/p2p/rlpx_protocols/whisper_protocol as whisper
|
||||||
|
|
|
@ -7,6 +7,8 @@
|
||||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
# MIT license (LICENSE-MIT)
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, options, tables],
|
std/[sequtils, options, tables],
|
||||||
chronos, testutils/unittests, bearssl,
|
chronos, testutils/unittests, bearssl,
|
||||||
|
|
Loading…
Reference in New Issue