mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-26 23:02:30 +00:00
deploy: 43fd11b4dc0e4ea108ae8303c9befdbe70a7d1df
This commit is contained in:
parent
ab25282464
commit
cbfae16dce
@ -78,6 +78,11 @@ type
|
||||
defaultValue: 50
|
||||
name: "max-connections" }: uint16
|
||||
|
||||
peerStoreCapacity* {.
|
||||
desc: "Maximum stored peers in the peerstore."
|
||||
defaultValue: 100
|
||||
name: "peer-store-capacity" }: int
|
||||
|
||||
peerPersistence* {.
|
||||
desc: "Enable peer persistence.",
|
||||
defaultValue: false,
|
||||
|
@ -273,7 +273,8 @@ proc initNode(conf: WakuNodeConf,
|
||||
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
||||
dns4DomainName,
|
||||
discv5UdpPort,
|
||||
some(conf.agentString)
|
||||
some(conf.agentString),
|
||||
some(conf.peerStoreCapacity),
|
||||
)
|
||||
except:
|
||||
return err("failed to create waku node instance: " & getCurrentExceptionMsg())
|
||||
|
@ -37,7 +37,7 @@ proc setupAndPublish() {.async.} =
|
||||
ip = ValidIpAddress.init("0.0.0.0")
|
||||
node = WakuNode.new(nodeKey, ip, Port(wakuPort))
|
||||
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)
|
||||
|
||||
|
||||
# assumes behind a firewall, so not care about being discoverable
|
||||
node.wakuDiscv5 = WakuDiscoveryV5.new(
|
||||
extIp= none(ValidIpAddress),
|
||||
@ -59,7 +59,7 @@ proc setupAndPublish() {.async.} =
|
||||
|
||||
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
||||
while true:
|
||||
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
|
||||
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
||||
if numConnectedPeers >= 6:
|
||||
notice "publisher is ready", connectedPeers=numConnectedPeers, required=6
|
||||
break
|
||||
@ -85,4 +85,4 @@ proc setupAndPublish() {.async.} =
|
||||
await sleepAsync(5000)
|
||||
|
||||
asyncSpawn setupAndPublish()
|
||||
runForever()
|
||||
runForever()
|
||||
|
@ -55,7 +55,7 @@ proc setupAndSubscribe() {.async.} =
|
||||
|
||||
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
||||
while true:
|
||||
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
|
||||
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
||||
if numConnectedPeers >= 6:
|
||||
notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6
|
||||
break
|
||||
@ -81,4 +81,4 @@ proc setupAndSubscribe() {.async.} =
|
||||
|
||||
asyncSpawn setupAndSubscribe()
|
||||
|
||||
runForever()
|
||||
runForever()
|
||||
|
@ -5,6 +5,7 @@ import
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
json_rpc/rpcserver,
|
||||
json_rpc/rpcclient,
|
||||
eth/keys,
|
||||
@ -24,7 +25,8 @@ import
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_filter,
|
||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||
../test_helpers
|
||||
../test_helpers,
|
||||
./testlib/testutils
|
||||
|
||||
procSuite "Peer Manager":
|
||||
asyncTest "Peer dialing works":
|
||||
@ -34,7 +36,7 @@ procSuite "Peer Manager":
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802))
|
||||
peerInfo2 = node2.switch.peerInfo
|
||||
|
||||
|
||||
await allFutures([node1.start(), node2.start()])
|
||||
|
||||
await node1.mountRelay()
|
||||
@ -47,17 +49,17 @@ procSuite "Peer Manager":
|
||||
check:
|
||||
conn.activity
|
||||
conn.peerId == peerInfo2.peerId
|
||||
|
||||
|
||||
# Check that node2 is being managed in node1
|
||||
check:
|
||||
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
|
||||
# Check connectedness
|
||||
check:
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected
|
||||
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connectedness.Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop()])
|
||||
|
||||
|
||||
asyncTest "Dialing fails gracefully":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
@ -65,7 +67,7 @@ procSuite "Peer Manager":
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812))
|
||||
peerInfo2 = node2.switch.peerInfo
|
||||
|
||||
|
||||
await node1.start()
|
||||
# Purposefully don't start node2
|
||||
|
||||
@ -78,7 +80,7 @@ procSuite "Peer Manager":
|
||||
# Check connection failed gracefully
|
||||
check:
|
||||
connOpt.isNone()
|
||||
|
||||
|
||||
await node1.stop()
|
||||
|
||||
asyncTest "Adding, selecting and filtering peers work":
|
||||
@ -97,7 +99,7 @@ procSuite "Peer Manager":
|
||||
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
|
||||
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
|
||||
storePeer = PeerInfo.new(storeKey, @[storeLoc])
|
||||
|
||||
|
||||
await node.start()
|
||||
|
||||
await node.mountFilterClient()
|
||||
@ -105,25 +107,25 @@ procSuite "Peer Manager":
|
||||
node.mountStoreClient()
|
||||
|
||||
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
|
||||
|
||||
|
||||
node.setStorePeer(storePeer.toRemotePeerInfo())
|
||||
node.setFilterPeer(filterPeer.toRemotePeerInfo())
|
||||
|
||||
# Check peers were successfully added to peer manager
|
||||
check:
|
||||
node.peerManager.peers().len == 3
|
||||
node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
|
||||
it.addrs.contains(filterLoc) and
|
||||
it.protos.contains(WakuFilterCodec))
|
||||
node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
|
||||
it.addrs.contains(swapLoc) and
|
||||
it.protos.contains(WakuSwapCodec))
|
||||
node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
|
||||
it.addrs.contains(storeLoc) and
|
||||
it.protos.contains(WakuStoreCodec))
|
||||
|
||||
node.peerManager.peerStore.peers().len == 3
|
||||
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
|
||||
it.addrs.contains(filterLoc) and
|
||||
it.protos.contains(WakuFilterCodec))
|
||||
node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
|
||||
it.addrs.contains(swapLoc) and
|
||||
it.protos.contains(WakuSwapCodec))
|
||||
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
|
||||
it.addrs.contains(storeLoc) and
|
||||
it.protos.contains(WakuStoreCodec))
|
||||
|
||||
await node.stop()
|
||||
|
||||
|
||||
|
||||
asyncTest "Peer manager keeps track of connections":
|
||||
let
|
||||
@ -132,7 +134,7 @@ procSuite "Peer Manager":
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832))
|
||||
peerInfo2 = node2.switch.peerInfo
|
||||
|
||||
|
||||
await node1.start()
|
||||
|
||||
await node1.mountRelay()
|
||||
@ -142,28 +144,28 @@ procSuite "Peer Manager":
|
||||
node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)
|
||||
check:
|
||||
# No information about node2's connectedness
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
# Purposefully don't start node2
|
||||
# Attempt dialing node2 from node1
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Cannot connect to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CannotConnect
|
||||
|
||||
# Successful connection
|
||||
await node2.start()
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Stop node. Gracefully disconnect from all peers.
|
||||
await node1.stop()
|
||||
check:
|
||||
# Not currently connected to node2, but had recent, successful connection.
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
|
||||
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CanConnect
|
||||
|
||||
await node2.stop()
|
||||
|
||||
asyncTest "Peer manager can use persistent storage and survive restarts":
|
||||
@ -173,9 +175,9 @@ procSuite "Peer Manager":
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage)
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842))
|
||||
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842))
|
||||
peerInfo2 = node2.switch.peerInfo
|
||||
|
||||
|
||||
await node1.start()
|
||||
await node2.start()
|
||||
|
||||
@ -185,33 +187,34 @@ procSuite "Peer Manager":
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.peers().len == 1
|
||||
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node1.peerManager.peerStore.peers().len == 1
|
||||
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Simulate restart by initialising a new node using the same storage
|
||||
let
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage)
|
||||
|
||||
|
||||
await node3.start()
|
||||
check:
|
||||
# Node2 has been loaded after "restart", but we have not yet reconnected
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
await node3.mountRelay() # This should trigger a reconnect
|
||||
|
||||
|
||||
check:
|
||||
# Reconnected to node2 after "restart"
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||
|
||||
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
|
||||
# TODO: nwaku/issues/1377
|
||||
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
|
||||
let
|
||||
database = SqliteDatabase.new(":memory:")[]
|
||||
storage = WakuPeerStorage.new(database)[]
|
||||
@ -222,7 +225,7 @@ procSuite "Peer Manager":
|
||||
peerInfo2 = node2.switch.peerInfo
|
||||
betaCodec = "/vac/waku/relay/2.0.0-beta2"
|
||||
stableCodec = "/vac/waku/relay/2.0.0"
|
||||
|
||||
|
||||
await node1.start()
|
||||
await node2.start()
|
||||
|
||||
@ -234,16 +237,16 @@ procSuite "Peer Manager":
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.peers().len == 1
|
||||
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node1.peerManager.peerStore.peers().len == 1
|
||||
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Simulate restart by initialising a new node using the same storage
|
||||
let
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage)
|
||||
|
||||
|
||||
await node3.mountRelay()
|
||||
node3.wakuRelay.codec = stableCodec
|
||||
check:
|
||||
@ -251,19 +254,63 @@ procSuite "Peer Manager":
|
||||
node2.wakuRelay.codec == betaCodec
|
||||
node3.wakuRelay.codec == stableCodec
|
||||
# Node2 has been loaded after "restart", but we have not yet reconnected
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
await node3.start() # This should trigger a reconnect
|
||||
|
||||
check:
|
||||
# Reconnected to node2 after "restart"
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.peers().anyIt(it.protos.contains(stableCodec))
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec))
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||
|
||||
asyncTest "Peer manager connects to all peers supporting a given protocol":
|
||||
# Create 4 nodes
|
||||
var nodes: seq[WakuNode]
|
||||
for i in 0..<4:
|
||||
let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i))
|
||||
nodes &= node
|
||||
|
||||
# Start them
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||
|
||||
# Get all peer infos
|
||||
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
# Add all peers (but self) to node 0
|
||||
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
|
||||
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
|
||||
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
|
||||
|
||||
# Attempt to connect to all known peers supporting a given protocol
|
||||
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
|
||||
|
||||
check:
|
||||
# Peerstore track all three peers
|
||||
nodes[0].peerManager.peerStore.peers().len == 3
|
||||
|
||||
# All peer ids are correct
|
||||
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId)
|
||||
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId)
|
||||
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId)
|
||||
|
||||
# All peers support the relay protocol
|
||||
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec)
|
||||
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec)
|
||||
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec)
|
||||
|
||||
# All peers are connected
|
||||
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected
|
||||
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected
|
||||
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected
|
||||
|
||||
await allFutures(nodes.mapIt(it.stop()))
|
||||
|
256
tests/v2/test_peer_store_extended.nim
Normal file
256
tests/v2/test_peer_store_extended.nim
Normal file
@ -0,0 +1,256 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options,sequtils],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/peerstore,
|
||||
libp2p/multiaddress,
|
||||
testutils/unittests
|
||||
import
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/node/peer_manager/waku_peer_store,
|
||||
../../waku/v2/node/waku_node,
|
||||
../test_helpers,
|
||||
./testlib/testutils
|
||||
|
||||
|
||||
suite "Extended nim-libp2p Peer Store":
|
||||
# Valid peerId missing the last digit. Useful for creating new peerIds
|
||||
# basePeerId & "1"
|
||||
# basePeerId & "2"
|
||||
let basePeerId = "QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW"
|
||||
|
||||
setup:
|
||||
# Setup a nim-libp2p peerstore with some peers
|
||||
let peerStore = PeerStore.new(capacity = 50)
|
||||
var p1, p2, p3, p4, p5, p6: PeerId
|
||||
|
||||
# create five peers basePeerId + [1-5]
|
||||
require p1.init(basePeerId & "1")
|
||||
require p2.init(basePeerId & "2")
|
||||
require p3.init(basePeerId & "3")
|
||||
require p4.init(basePeerId & "4")
|
||||
require p5.init(basePeerId & "5")
|
||||
|
||||
# peer6 is not part of the peerstore
|
||||
require p6.init(basePeerId & "6")
|
||||
|
||||
# Peer1: Connected
|
||||
peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||
peerStore[ProtoBook][p1] = @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
peerStore[KeyBook][p1] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p1] = "nwaku"
|
||||
peerStore[ProtoVersionBook][p1] = "protoVersion1"
|
||||
peerStore[ConnectionBook][p1] = Connected
|
||||
peerStore[DisconnectBook][p1] = 0
|
||||
peerStore[SourceBook][p1] = Discv5
|
||||
|
||||
# Peer2: Connected
|
||||
peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()]
|
||||
peerStore[ProtoBook][p2] = @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
|
||||
peerStore[KeyBook][p2] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p2] = "nwaku"
|
||||
peerStore[ProtoVersionBook][p2] = "protoVersion2"
|
||||
peerStore[ConnectionBook][p2] = Connected
|
||||
peerStore[DisconnectBook][p2] = 0
|
||||
peerStore[SourceBook][p2] = Discv5
|
||||
|
||||
# Peer3: Connected
|
||||
peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
|
||||
peerStore[ProtoBook][p3] = @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
peerStore[KeyBook][p3] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p3] = "gowaku"
|
||||
peerStore[ProtoVersionBook][p3] = "protoVersion3"
|
||||
peerStore[ConnectionBook][p3] = Connected
|
||||
peerStore[DisconnectBook][p3] = 0
|
||||
peerStore[SourceBook][p3] = Discv5
|
||||
|
||||
# Peer4: Added but never connected
|
||||
peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()]
|
||||
# unknown: peerStore[ProtoBook][p4]
|
||||
peerStore[KeyBook][p4] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
# unknown: peerStore[AgentBook][p4]
|
||||
# unknown: peerStore[ProtoVersionBook][p4]
|
||||
peerStore[ConnectionBook][p4] = NotConnected
|
||||
peerStore[DisconnectBook][p4] = 0
|
||||
peerStore[SourceBook][p4] = Discv5
|
||||
|
||||
# Peer5: Connecteed in the past
|
||||
peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()]
|
||||
peerStore[ProtoBook][p5] = @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||
peerStore[KeyBook][p5] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p5] = "gowaku"
|
||||
peerStore[ProtoVersionBook][p5] = "protoVersion5"
|
||||
peerStore[ConnectionBook][p5] = CanConnect
|
||||
peerStore[DisconnectBook][p5] = 1000
|
||||
peerStore[SourceBook][p5] = Discv5
|
||||
|
||||
test "get() returns the correct StoredInfo for a given PeerId":
|
||||
# When
|
||||
let storedInfoPeer1 = peerStore.get(p1)
|
||||
let storedInfoPeer6 = peerStore.get(p6)
|
||||
|
||||
# Then
|
||||
check:
|
||||
# regression on nim-libp2p fields
|
||||
storedInfoPeer1.peerId == p1
|
||||
storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||
storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
storedInfoPeer1.agent == "nwaku"
|
||||
storedInfoPeer1.protoVersion == "protoVersion1"
|
||||
|
||||
# our extended fields
|
||||
storedInfoPeer1.connectedness == Connected
|
||||
storedInfoPeer1.disconnectTime == 0
|
||||
storedInfoPeer1.origin == Discv5
|
||||
|
||||
check:
|
||||
# fields are empty
|
||||
storedInfoPeer6.peerId == p6
|
||||
storedInfoPeer6.addrs.len == 0
|
||||
storedInfoPeer6.protos.len == 0
|
||||
storedInfoPeer6.agent == ""
|
||||
storedInfoPeer6.protoVersion == ""
|
||||
storedInfoPeer6.connectedness == NotConnected
|
||||
storedInfoPeer6.disconnectTime == 0
|
||||
storedInfoPeer6.origin == Unknown
|
||||
|
||||
test "peers() returns all StoredInfo of the PeerStore":
|
||||
# When
|
||||
let allPeers = peerStore.peers()
|
||||
|
||||
# Then
|
||||
check:
|
||||
allPeers.len == 5
|
||||
allPeers.anyIt(it.peerId == p1)
|
||||
allPeers.anyIt(it.peerId == p2)
|
||||
allPeers.anyIt(it.peerId == p3)
|
||||
allPeers.anyIt(it.peerId == p4)
|
||||
allPeers.anyIt(it.peerId == p5)
|
||||
|
||||
let p3 = allPeers.filterIt(it.peerId == p3)[0]
|
||||
|
||||
check:
|
||||
# regression on nim-libp2p fields
|
||||
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
|
||||
p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
p3.agent == "gowaku"
|
||||
p3.protoVersion == "protoVersion3"
|
||||
|
||||
# our extended fields
|
||||
p3.connectedness == Connected
|
||||
p3.disconnectTime == 0
|
||||
p3.origin == Discv5
|
||||
|
||||
test "peers() returns all StoredInfo matching a specific protocol":
|
||||
# When
|
||||
let storePeers = peerStore.peers("/vac/waku/store/2.0.0")
|
||||
let lpPeers = peerStore.peers("/vac/waku/lightpush/2.0.0")
|
||||
|
||||
# Then
|
||||
check:
|
||||
# Only p1 and p2 support that protocol
|
||||
storePeers.len == 2
|
||||
storePeers.anyIt(it.peerId == p1)
|
||||
storePeers.anyIt(it.peerId == p2)
|
||||
|
||||
check:
|
||||
# Only p3 supports that protocol
|
||||
lpPeers.len == 1
|
||||
lpPeers.anyIt(it.peerId == p3)
|
||||
lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
|
||||
test "peers() returns all StoredInfo matching a given protocolMatcher":
|
||||
# When
|
||||
let pMatcherStorePeers = peerStore.peers(protocolMatcher("/vac/waku/store/2.0.0"))
|
||||
let pMatcherSwapPeers = peerStore.peers(protocolMatcher("/vac/waku/swap/2.0.0"))
|
||||
|
||||
# Then
|
||||
check:
|
||||
# peers: 1,2,3,5 match /vac/waku/store/2.0.0/xxx
|
||||
pMatcherStorePeers.len == 4
|
||||
pMatcherStorePeers.anyIt(it.peerId == p1)
|
||||
pMatcherStorePeers.anyIt(it.peerId == p2)
|
||||
pMatcherStorePeers.anyIt(it.peerId == p3)
|
||||
pMatcherStorePeers.anyIt(it.peerId == p5)
|
||||
|
||||
check:
|
||||
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
|
||||
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||
|
||||
check:
|
||||
pMatcherSwapPeers.len == 1
|
||||
pMatcherSwapPeers.anyIt(it.peerId == p5)
|
||||
pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||
|
||||
test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
|
||||
# Given
|
||||
let storedInfoPeer1 = peerStore.get(p1)
|
||||
|
||||
# When
|
||||
let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo()
|
||||
|
||||
# Then
|
||||
check:
|
||||
remotePeerInfo1.peerId == p1
|
||||
remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||
remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
|
||||
test "connectedness() returns the connection status of a given PeerId":
|
||||
check:
|
||||
# peers tracked in the peerstore
|
||||
peerStore.connectedness(p1) == Connected
|
||||
peerStore.connectedness(p2) == Connected
|
||||
peerStore.connectedness(p3) == Connected
|
||||
peerStore.connectedness(p4) == NotConnected
|
||||
peerStore.connectedness(p5) == CanConnect
|
||||
|
||||
# peer not tracked in the peerstore
|
||||
peerStore.connectedness(p6) == NotConnected
|
||||
|
||||
test "hasPeer() returns true if the peer supports a given protocol":
|
||||
check:
|
||||
peerStore.hasPeer(p1, "/vac/waku/relay/2.0.0-beta1")
|
||||
peerStore.hasPeer(p1, "/vac/waku/store/2.0.0")
|
||||
not peerStore.hasPeer(p1, "it-does-not-contain-this-protocol")
|
||||
|
||||
peerStore.hasPeer(p2, "/vac/waku/relay/2.0.0")
|
||||
peerStore.hasPeer(p2, "/vac/waku/store/2.0.0")
|
||||
|
||||
peerStore.hasPeer(p3, "/vac/waku/lightpush/2.0.0")
|
||||
peerStore.hasPeer(p3, "/vac/waku/store/2.0.0-beta1")
|
||||
|
||||
# we have no knowledge of p4 supported protocols
|
||||
not peerStore.hasPeer(p4, "/vac/waku/lightpush/2.0.0")
|
||||
|
||||
peerStore.hasPeer(p5, "/vac/waku/swap/2.0.0")
|
||||
peerStore.hasPeer(p5, "/vac/waku/store/2.0.0-beta2")
|
||||
not peerStore.hasPeer(p5, "another-protocol-not-contained")
|
||||
|
||||
# peer 6 is not in the PeerStore
|
||||
not peerStore.hasPeer(p6, "/vac/waku/lightpush/2.0.0")
|
||||
|
||||
test "hasPeers() returns true if any peer in the PeerStore supports a given protocol":
|
||||
# Match specific protocols
|
||||
check:
|
||||
peerStore.hasPeers("/vac/waku/relay/2.0.0-beta1")
|
||||
peerStore.hasPeers("/vac/waku/store/2.0.0")
|
||||
peerStore.hasPeers("/vac/waku/lightpush/2.0.0")
|
||||
not peerStore.hasPeers("/vac/waku/does-not-exist/2.0.0")
|
||||
|
||||
# Match protocolMatcher protocols
|
||||
check:
|
||||
peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0"))
|
||||
not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0"))
|
||||
|
||||
test "selectPeer() returns if a peer supports a given protocol":
|
||||
# When
|
||||
let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0")
|
||||
|
||||
# Then
|
||||
check:
|
||||
swapPeer.isSome()
|
||||
swapPeer.get().peerId == p5
|
||||
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
@ -88,11 +88,11 @@ procSuite "Waku DNS Discovery":
|
||||
|
||||
check:
|
||||
# We have successfully connected to all discovered nodes
|
||||
node4.peerManager.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node1.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node2.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node3.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peerStore.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId)
|
||||
node4.peerManager.peerStore.connectedness(node1.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peerStore.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId)
|
||||
node4.peerManager.peerStore.connectedness(node2.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peerStore.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId)
|
||||
node4.peerManager.peerStore.connectedness(node3.switch.peerInfo.peerId) == Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()])
|
||||
|
@ -101,7 +101,7 @@ proc main(): Future[int] {.async.} =
|
||||
|
||||
# ensure input protocols are valid
|
||||
for p in conf.protocols:
|
||||
if p notin ProtocolsTable:
|
||||
if p notin ProtocolsTable:
|
||||
error "invalid protocol", protocol=p, valid=ProtocolsTable
|
||||
raise newException(ConfigurationError, "Invalid cli flag values" & p)
|
||||
|
||||
@ -128,7 +128,7 @@ proc main(): Future[int] {.async.} =
|
||||
error "Timedout after", timeout=conf.timeout
|
||||
|
||||
let lp2pPeerStore = node.switch.peerStore
|
||||
let conStatus = node.peerManager.peerStore.connectionBook[peer.peerId]
|
||||
let conStatus = node.peerManager.peerStore[ConnectionBook][peer.peerId]
|
||||
|
||||
if conStatus in [Connected, CanConnect]:
|
||||
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId]
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az318-892:
|
||||
# Libtool was configured on host fv-az39-360:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -39,7 +39,7 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
|
||||
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
|
||||
## Admin API version 1 definitions
|
||||
|
||||
|
||||
rpcsrv.rpc("post_waku_v2_admin_v1_peers") do(peers: seq[string]) -> bool:
|
||||
## Connect to a list of peers
|
||||
debug "post_waku_v2_admin_v1_peers"
|
||||
@ -64,34 +64,38 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
|
||||
if not node.wakuRelay.isNil:
|
||||
# Map managed peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuRelayCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuRelayCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuRelayCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
|
||||
if not node.wakuFilter.isNil:
|
||||
# Map WakuFilter peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuFilterCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuFilterCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuFilterCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
|
||||
if not node.wakuSwap.isNil:
|
||||
# Map WakuSwap peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuSwapCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuSwapCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuSwapCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuStore.isNil:
|
||||
# Map WakuStore peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuStoreCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuStoreCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuStoreCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
# @TODO filter output on protocol/connected-status
|
||||
|
@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
## Returns history for a list of content topics with optional paging
|
||||
debug "get_waku_v2_store_v1_messages"
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
raise newException(ValueError, "no suitable remote store peers")
|
||||
|
||||
|
@ -6,8 +6,8 @@ else:
|
||||
|
||||
import
|
||||
std/[options, sets, sequtils, times],
|
||||
chronos,
|
||||
chronicles,
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
libp2p/multistream
|
||||
import
|
||||
@ -29,7 +29,7 @@ logScope:
|
||||
type
|
||||
PeerManager* = ref object of RootObj
|
||||
switch*: Switch
|
||||
peerStore*: WakuPeerStore
|
||||
peerStore*: PeerStore
|
||||
storage: PeerStorage
|
||||
|
||||
let
|
||||
@ -39,11 +39,6 @@ let
|
||||
# Helper functions #
|
||||
####################
|
||||
|
||||
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
|
||||
RemotePeerInfo.init(peerId = storedInfo.peerId,
|
||||
addrs = toSeq(storedInfo.addrs),
|
||||
protocols = toSeq(storedInfo.protos))
|
||||
|
||||
proc insertOrReplace(ps: PeerStorage,
|
||||
peerId: PeerID,
|
||||
storedInfo: StoredInfo,
|
||||
@ -55,7 +50,7 @@ proc insertOrReplace(ps: PeerStorage,
|
||||
warn "failed to store peers", err = res.error
|
||||
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
||||
|
||||
proc dialPeer(pm: PeerManager, peerId: PeerID,
|
||||
proc dialPeer(pm: PeerManager, peerId: PeerID,
|
||||
addrs: seq[MultiAddress], proto: string,
|
||||
dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
||||
info "Dialing peer from manager", wireAddr = addrs, peerId = peerId
|
||||
@ -73,20 +68,20 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
||||
debug "Dialing remote peer timed out"
|
||||
waku_peers_dials.inc(labelValues = ["timeout"])
|
||||
|
||||
pm.peerStore.connectionBook[peerId] = CannotConnect
|
||||
pm.peerStore[ConnectionBook][peerId] = CannotConnect
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
||||
|
||||
|
||||
return none(Connection)
|
||||
except CatchableError as e:
|
||||
# TODO: any redial attempts?
|
||||
debug "Dialing remote peer failed", msg = e.msg
|
||||
waku_peers_dials.inc(labelValues = ["failed"])
|
||||
|
||||
pm.peerStore.connectionBook[peerId] = CannotConnect
|
||||
|
||||
pm.peerStore[ConnectionBook][peerId] = CannotConnect
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
||||
|
||||
|
||||
return none(Connection)
|
||||
|
||||
proc loadFromStorage(pm: PeerManager) =
|
||||
@ -99,49 +94,55 @@ proc loadFromStorage(pm: PeerManager) =
|
||||
# Do not manage self
|
||||
return
|
||||
|
||||
pm.peerStore.addressBook[peerId] = storedInfo.addrs
|
||||
pm.peerStore.protoBook[peerId] = storedInfo.protos
|
||||
pm.peerStore.keyBook[peerId] = storedInfo.publicKey
|
||||
pm.peerStore.connectionBook[peerId] = NotConnected # Reset connectedness state
|
||||
pm.peerStore.disconnectBook[peerId] = disconnectTime
|
||||
|
||||
# nim-libp2p books
|
||||
pm.peerStore[AddressBook][peerId] = storedInfo.addrs
|
||||
pm.peerStore[ProtoBook][peerId] = storedInfo.protos
|
||||
pm.peerStore[KeyBook][peerId] = storedInfo.publicKey
|
||||
pm.peerStore[AgentBook][peerId] = storedInfo.agent
|
||||
pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion
|
||||
|
||||
# custom books
|
||||
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
|
||||
pm.peerStore[DisconnectBook][peerId] = disconnectTime
|
||||
pm.peerStore[SourceBook][peerId] = storedInfo.origin
|
||||
|
||||
let res = pm.storage.getAll(onData)
|
||||
if res.isErr:
|
||||
warn "failed to load peers from storage", err = res.error
|
||||
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
|
||||
else:
|
||||
debug "successfully queried peer storage"
|
||||
|
||||
|
||||
##################
|
||||
# Initialisation #
|
||||
##################
|
||||
##################
|
||||
|
||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
if not pm.peerStore.addressBook.contains(peerId):
|
||||
if not pm.peerStore[AddressBook].contains(peerId):
|
||||
## We only consider connection events if we
|
||||
## already track some addresses for this peer
|
||||
return
|
||||
|
||||
case event.kind
|
||||
of ConnEventKind.Connected:
|
||||
pm.peerStore.connectionBook[peerId] = Connected
|
||||
pm.peerStore[ConnectionBook][peerId] = Connected
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
||||
return
|
||||
of ConnEventKind.Disconnected:
|
||||
pm.peerStore.connectionBook[peerId] = CanConnect
|
||||
pm.peerStore[ConnectionBook][peerId] = CanConnect
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
||||
return
|
||||
|
||||
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
|
||||
let pm = PeerManager(switch: switch,
|
||||
peerStore: WakuPeerStore.new(),
|
||||
peerStore: switch.peerStore,
|
||||
storage: storage)
|
||||
|
||||
|
||||
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||
onConnEvent(pm, peerId, event)
|
||||
|
||||
|
||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
|
||||
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
|
||||
|
||||
@ -150,122 +151,68 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
|
||||
pm.loadFromStorage() # Load previously managed peers.
|
||||
else:
|
||||
debug "no peer storage found"
|
||||
|
||||
|
||||
return pm
|
||||
|
||||
#####################
|
||||
# Manager interface #
|
||||
#####################
|
||||
|
||||
proc peers*(pm: PeerManager): seq[StoredInfo] =
|
||||
# Return the known info for all peers
|
||||
pm.peerStore.peers()
|
||||
|
||||
proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
|
||||
# Return the known info for all peers registered on the specified protocol
|
||||
pm.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
|
||||
# Return the known info for all peers matching the provided protocolMatcher
|
||||
pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness =
|
||||
# Return the connection state of the given, managed peer
|
||||
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
||||
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
||||
|
||||
let storedInfo = pm.peerStore.get(peerId)
|
||||
|
||||
if (storedInfo == StoredInfo()):
|
||||
# Peer is not managed, therefore not connected
|
||||
return NotConnected
|
||||
else:
|
||||
pm.peerStore.connectionBook[peerId]
|
||||
|
||||
proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool =
|
||||
# Returns `true` if peer is included in manager for the specified protocol
|
||||
|
||||
pm.peerStore.get(peerId).protos.contains(proto)
|
||||
|
||||
proc hasPeers*(pm: PeerManager, proto: string): bool =
|
||||
# Returns `true` if manager has any peers for the specified protocol
|
||||
pm.peers.anyIt(it.protos.contains(proto))
|
||||
|
||||
proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool =
|
||||
# Returns `true` if manager has any peers matching the protocolMatcher
|
||||
pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
||||
# Adds peer to manager for the specified protocol
|
||||
|
||||
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||
# Do not attempt to manage our unmanageable self
|
||||
return
|
||||
|
||||
|
||||
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
||||
|
||||
|
||||
# ...known addresses
|
||||
for multiaddr in remotePeerInfo.addrs:
|
||||
pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr)
|
||||
|
||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr
|
||||
|
||||
# ...public key
|
||||
var publicKey: PublicKey
|
||||
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
|
||||
|
||||
pm.peerStore.keyBook[remotePeerInfo.peerId] = publicKey
|
||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
||||
|
||||
# ...associated protocols
|
||||
pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto)
|
||||
# nim-libp2p identify overrides this
|
||||
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto
|
||||
|
||||
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
|
||||
|
||||
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
||||
# Selects the best peer for a given protocol
|
||||
let peers = pm.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
if peers.len >= 1:
|
||||
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
||||
let peerStored = peers[0]
|
||||
|
||||
return some(peerStored.toRemotePeerInfo())
|
||||
else:
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
proc reconnectPeers*(pm: PeerManager,
|
||||
proto: string,
|
||||
protocolMatcher: Matcher,
|
||||
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
|
||||
## Reconnect to peers registered for this protocol. This will update connectedness.
|
||||
## Especially useful to resume connections from persistent storage after a restart.
|
||||
|
||||
|
||||
debug "Reconnecting peers", proto=proto
|
||||
|
||||
for storedInfo in pm.peers(protocolMatcher):
|
||||
# Check if peer is reachable.
|
||||
if pm.peerStore.connectionBook[storedInfo.peerId] == CannotConnect:
|
||||
debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId
|
||||
|
||||
for storedInfo in pm.peerStore.peers(protocolMatcher):
|
||||
# Check that the peer can be connected
|
||||
if storedInfo.connectedness == CannotConnect:
|
||||
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
|
||||
continue
|
||||
|
||||
|
||||
# Respect optional backoff period where applicable.
|
||||
let
|
||||
disconnectTime = Moment.init(pm.peerStore.disconnectBook[storedInfo.peerId], Second) # Convert
|
||||
# TODO: Add method to peerStore (eg isBackoffExpired())
|
||||
disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert
|
||||
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
|
||||
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
|
||||
|
||||
|
||||
trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime
|
||||
|
||||
|
||||
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
|
||||
if backoffTime > ZeroDuration:
|
||||
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
|
||||
# We disconnected recently and still need to wait for a backoff period before connecting
|
||||
await sleepAsync(backoffTime)
|
||||
|
||||
# Add to protos for peer, if it has not been added yet
|
||||
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
|
||||
let remotePeerInfo = storedInfo.toRemotePeerInfo()
|
||||
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
||||
pm.addPeer(remotePeerInfo, proto)
|
||||
|
||||
trace "Reconnecting to peer", peerId=storedInfo.peerId
|
||||
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
||||
@ -277,12 +224,12 @@ proc reconnectPeers*(pm: PeerManager,
|
||||
proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
||||
# Dial a given peer and add it to the list of known peers
|
||||
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
|
||||
|
||||
|
||||
# First add dialed peer info to peer store, if it does not exist yet...
|
||||
if not pm.hasPeer(remotePeerInfo.peerId, proto):
|
||||
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
|
||||
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
||||
pm.addPeer(remotePeerInfo, proto)
|
||||
|
||||
|
||||
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||
# Do not attempt to dial self
|
||||
return none(Connection)
|
||||
@ -292,7 +239,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d
|
||||
proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
|
||||
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
|
||||
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
|
||||
|
||||
|
||||
if peerId == pm.switch.peerInfo.peerId:
|
||||
# Do not attempt to dial self
|
||||
return none(Connection)
|
||||
@ -304,10 +251,10 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = def
|
||||
proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} =
|
||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||
info "Connecting to node", remotePeer = remotePeer, source = source
|
||||
|
||||
|
||||
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
||||
let connOpt = await pm.dialPeer(remotePeer, proto)
|
||||
|
||||
|
||||
if connOpt.isSome():
|
||||
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
||||
waku_node_conns_initiated.inc(labelValues = [source])
|
||||
@ -318,7 +265,7 @@ proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, s
|
||||
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
|
||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||
info "connectToNodes", len = nodes.len
|
||||
|
||||
|
||||
for nodeId in nodes:
|
||||
await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source)
|
||||
|
||||
@ -333,7 +280,7 @@ proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source
|
||||
proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} =
|
||||
## `source` indicates source of node addrs (static config, api call, discovery, etc)
|
||||
info "connectToNodes", len = nodes.len
|
||||
|
||||
|
||||
for remotePeerInfo in nodes:
|
||||
await connectToNode(pm, remotePeerInfo, proto, source)
|
||||
|
||||
|
@ -4,12 +4,17 @@ else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils, sets],
|
||||
std/[tables, sequtils, sets, options],
|
||||
libp2p/builders,
|
||||
libp2p/peerstore
|
||||
|
||||
import
|
||||
../../utils/peers
|
||||
|
||||
export peerstore, builders
|
||||
|
||||
# TODO rename to peer_store_extended to emphasize its a nimlibp2 extension
|
||||
|
||||
type
|
||||
Connectedness* = enum
|
||||
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
||||
@ -20,74 +25,107 @@ type
|
||||
CanConnect,
|
||||
# Connected: actively connected to peer.
|
||||
Connected
|
||||
|
||||
|
||||
PeerOrigin* = enum
|
||||
Unknown,
|
||||
Discv5,
|
||||
Static,
|
||||
Dns
|
||||
|
||||
# Keeps track of the Connectedness state of a peer
|
||||
ConnectionBook* = ref object of PeerBook[Connectedness]
|
||||
|
||||
DisconnectBook* = ref object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps
|
||||
# Keeps track of when peers were disconnected in Unix timestamps
|
||||
DisconnectBook* = ref object of PeerBook[int64]
|
||||
|
||||
# Keeps track of the origin of a peer
|
||||
SourceBook* = ref object of PeerBook[PeerOrigin]
|
||||
|
||||
WakuPeerStore* = ref object
|
||||
addressBook*: AddressBook
|
||||
protoBook*: ProtoBook
|
||||
keyBook*: KeyBook
|
||||
connectionBook*: ConnectionBook
|
||||
disconnectBook*: DisconnectBook
|
||||
|
||||
StoredInfo* = object
|
||||
# Collates stored info about a peer
|
||||
peerId*: PeerID
|
||||
# Taken from nim-libp2
|
||||
peerId*: PeerId
|
||||
addrs*: seq[MultiAddress]
|
||||
protos*: seq[string]
|
||||
publicKey*: PublicKey
|
||||
agent*: string
|
||||
protoVersion*: string
|
||||
|
||||
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
||||
let
|
||||
addressBook = AddressBook(book: initTable[PeerID, seq[MultiAddress]]())
|
||||
protoBook = ProtoBook(book: initTable[PeerID, seq[string]]())
|
||||
keyBook = KeyBook(book: initTable[PeerID, PublicKey]())
|
||||
connectionBook = ConnectionBook(book: initTable[PeerID, Connectedness]())
|
||||
disconnectBook = DisconnectBook(book: initTable[PeerID, int64]())
|
||||
|
||||
T(addressBook: addressBook,
|
||||
protoBook: protoBook,
|
||||
keyBook: keyBook,
|
||||
connectionBook: connectionBook,
|
||||
disconnectBook: disconnectBook)
|
||||
# Extended custom fields
|
||||
connectedness*: Connectedness
|
||||
disconnectTime*: int64
|
||||
origin*: PeerOrigin
|
||||
|
||||
#####################
|
||||
# Utility functions #
|
||||
#####################
|
||||
|
||||
proc add*[T](peerBook: SeqPeerBook[T],
|
||||
peerId: PeerId,
|
||||
entry: T) =
|
||||
## Add entry to a given peer. If the peer is not known,
|
||||
## it will be set with the provided entry.
|
||||
|
||||
peerBook.book.mgetOrPut(peerId,
|
||||
newSeq[T]()).add(entry)
|
||||
|
||||
# TODO: Notify clients?
|
||||
|
||||
##################
|
||||
##################
|
||||
# Peer Store API #
|
||||
##################
|
||||
|
||||
proc get*(peerStore: WakuPeerStore,
|
||||
proc get*(peerStore: PeerStore,
|
||||
peerId: PeerID): StoredInfo =
|
||||
## Get the stored information of a given peer.
|
||||
|
||||
StoredInfo(
|
||||
# Taken from nim-libp2
|
||||
peerId: peerId,
|
||||
addrs: peerStore.addressBook[peerId],
|
||||
protos: peerStore.protoBook[peerId],
|
||||
publicKey: peerStore.keyBook[peerId]
|
||||
addrs: peerStore[AddressBook][peerId],
|
||||
protos: peerStore[ProtoBook][peerId],
|
||||
publicKey: peerStore[KeyBook][peerId],
|
||||
agent: peerStore[AgentBook][peerId],
|
||||
protoVersion: peerStore[ProtoVersionBook][peerId],
|
||||
|
||||
# Extended custom fields
|
||||
connectedness: peerStore[ConnectionBook][peerId],
|
||||
disconnectTime: peerStore[DisconnectBook][peerId],
|
||||
origin: peerStore[SourceBook][peerId],
|
||||
)
|
||||
|
||||
proc peers*(peerStore: WakuPeerStore): seq[StoredInfo] =
|
||||
proc peers*(peerStore: PeerStore): seq[StoredInfo] =
|
||||
## Get all the stored information of every peer.
|
||||
|
||||
let allKeys = concat(toSeq(keys(peerStore.addressBook.book)),
|
||||
toSeq(keys(peerStore.protoBook.book)),
|
||||
toSeq(keys(peerStore.keyBook.book))).toHashSet()
|
||||
let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()),
|
||||
toSeq(peerStore[ProtoBook].book.keys()),
|
||||
toSeq(peerStore[KeyBook].book.keys())).toHashSet()
|
||||
|
||||
return allKeys.mapIt(peerStore.get(it))
|
||||
|
||||
proc peers*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
|
||||
# Return the known info for all peers registered on the specified protocol
|
||||
peerStore.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[StoredInfo] =
|
||||
# Return the known info for all peers matching the provided protocolMatcher
|
||||
peerStore.peers.filterIt(it.protos.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
|
||||
RemotePeerInfo.init(peerId = storedInfo.peerId,
|
||||
addrs = toSeq(storedInfo.addrs),
|
||||
protocols = toSeq(storedInfo.protos))
|
||||
|
||||
|
||||
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
|
||||
# Return the connection state of the given, managed peer
|
||||
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
||||
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
||||
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
|
||||
|
||||
proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
|
||||
# Returns `true` if peer is included in manager for the specified protocol
|
||||
# TODO: What if peer does not exist in the peerStore?
|
||||
peerStore.get(peerId).protos.contains(proto)
|
||||
|
||||
proc hasPeers*(peerStore: PeerStore, proto: string): bool =
|
||||
# Returns `true` if the peerstore has any peer for the specified protocol
|
||||
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto))
|
||||
|
||||
proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
|
||||
# Returns `true` if the peerstore has any peer matching the protocolMatcher
|
||||
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
|
||||
# Selects the best peer for a given protocol
|
||||
let peers = peerStore.peers().filterIt(it.protos.contains(proto))
|
||||
|
||||
if peers.len >= 1:
|
||||
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
||||
let peerStored = peers[0]
|
||||
|
||||
return some(peerStored.toRemotePeerInfo())
|
||||
else:
|
||||
return none(RemotePeerInfo)
|
||||
|
@ -102,7 +102,7 @@ type
|
||||
started*: bool # Indicates that node has started listening
|
||||
|
||||
|
||||
proc protocolMatcher(codec: string): Matcher =
|
||||
proc protocolMatcher*(codec: string): Matcher =
|
||||
## Returns a protocol matcher function for the provided codec
|
||||
proc match(proto: string): bool {.gcsafe.} =
|
||||
## Matches a proto with any postfix to the provided codec.
|
||||
@ -146,7 +146,8 @@ proc new*(T: type WakuNode,
|
||||
sendSignedPeerRecord = false,
|
||||
dns4DomainName = none(string),
|
||||
discv5UdpPort = none(Port),
|
||||
agentString = none(string), # defaults to nim-libp2p version
|
||||
agentString = none(string), # defaults to nim-libp2p version
|
||||
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
|
||||
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
||||
## Creates a Waku Node instance.
|
||||
|
||||
@ -217,7 +218,8 @@ proc new*(T: type WakuNode,
|
||||
secureCertPath = secureCert,
|
||||
nameResolver = nameResolver,
|
||||
sendSignedPeerRecord = sendSignedPeerRecord,
|
||||
agentString = agentString
|
||||
agentString = agentString,
|
||||
peerStoreCapacity = peerStoreCapacity,
|
||||
)
|
||||
|
||||
let wakuNode = WakuNode(
|
||||
@ -357,7 +359,7 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||
node.subscribe(topic, none(TopicHandler))
|
||||
|
||||
# Resume previous relay connections
|
||||
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
info "Found previous WakuRelay peers. Reconnecting."
|
||||
|
||||
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
|
||||
@ -502,7 +504,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
|
||||
error "cannot register filter subscription to topic", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
|
||||
if peerOpt.isNone():
|
||||
error "cannot register filter subscription to topic", error="no suitable remote peers"
|
||||
return
|
||||
@ -517,7 +519,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
|
||||
error "cannot unregister filter subscription to content", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
|
||||
if peerOpt.isNone():
|
||||
error "cannot register filter subscription to topic", error="no suitable remote peers"
|
||||
return
|
||||
@ -675,7 +677,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
|
||||
if node.wakuStoreClient.isNil():
|
||||
return err("waku store client is nil")
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
error "no suitable remote peers"
|
||||
return err("peer_not_found_failure")
|
||||
@ -764,7 +766,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
|
||||
error "failed to publish message", error="waku lightpush client is nil"
|
||||
return
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
error "failed to publish message", error="no suitable remote peers"
|
||||
return
|
||||
@ -824,14 +826,15 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
|
||||
node.switch.mount(node.libp2pPing)
|
||||
|
||||
# TODO: Move this logic to PeerManager
|
||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||
while node.started:
|
||||
# Keep all connected peers alive while running
|
||||
trace "Running keepalive"
|
||||
|
||||
# First get a list of connected peer infos
|
||||
let peers = node.peerManager.peers()
|
||||
.filterIt(node.peerManager.connectedness(it.peerId) == Connected)
|
||||
let peers = node.peerManager.peerStore.peers()
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt(it.toRemotePeerInfo())
|
||||
|
||||
# Attempt to retrieve and ping the active outgoing connection for each peer
|
||||
@ -855,6 +858,9 @@ proc startKeepalive*(node: WakuNode) =
|
||||
|
||||
asyncSpawn node.keepaliveLoop(defaultKeepalive)
|
||||
|
||||
# TODO: Decouple discovery logic from connection logic
|
||||
# A discovered peer goes to the PeerStore
|
||||
# The PeerManager uses to PeerStore to dial peers
|
||||
proc runDiscv5Loop(node: WakuNode) {.async.} =
|
||||
## Continuously add newly discovered nodes
|
||||
## using Node Discovery v5
|
||||
|
@ -43,7 +43,7 @@ proc withWssTransport*(b: SwitchBuilder,
|
||||
secureKeyPath: string,
|
||||
secureCertPath: string): SwitchBuilder
|
||||
{.raises: [Defect, IOError].} =
|
||||
|
||||
|
||||
let key : TLSPrivateKey = getSecureKey(secureKeyPath)
|
||||
let cert : TLSCertificate = getSecureCert(secureCertPath)
|
||||
b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr,
|
||||
@ -71,7 +71,8 @@ proc newWakuSwitch*(
|
||||
wssEnabled: bool = false,
|
||||
secureKeyPath: string = "",
|
||||
secureCertPath: string = "",
|
||||
agentString = none(string), # defaults to nim-libp2p version
|
||||
agentString = none(string), # defaults to nim-libp2p version,
|
||||
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
|
||||
): Switch
|
||||
{.raises: [Defect, IOError, LPError].} =
|
||||
|
||||
@ -88,6 +89,8 @@ proc newWakuSwitch*(
|
||||
.withNameResolver(nameResolver)
|
||||
.withSignedPeerRecord(sendSignedPeerRecord)
|
||||
|
||||
if peerStoreCapacity.isSome():
|
||||
b = b.withPeerStore(peerStoreCapacity.get())
|
||||
if agentString.isSome():
|
||||
b = b.withAgentVersion(agentString.get())
|
||||
if privKey.isSome():
|
||||
@ -103,4 +106,4 @@ proc newWakuSwitch*(
|
||||
else :
|
||||
b = b.withAddress(address)
|
||||
|
||||
b.build()
|
||||
b.build()
|
||||
|
@ -77,7 +77,7 @@ proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Fut
|
||||
return ok()
|
||||
|
||||
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
||||
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
|
||||
if peerOpt.isNone():
|
||||
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
||||
return err(peerNotFoundFailure)
|
||||
@ -106,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo
|
||||
return ok()
|
||||
|
||||
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
||||
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
|
||||
if peerOpt.isNone():
|
||||
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
||||
return err(peerNotFoundFailure)
|
||||
|
@ -210,7 +210,7 @@ when defined(waku_exp_store_resume):
|
||||
else:
|
||||
debug "no candidate list is provided, selecting a random peer"
|
||||
# if no peerList is set then query from one of the peers stored in the peer manager
|
||||
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||
let peerOpt = w.peerManager.peerStore.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
warn "no suitable remote peers"
|
||||
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
|
||||
|
Loading…
x
Reference in New Issue
Block a user