deploy: 236b577bf9eaf35d7960fff1216abec3ad132030

This commit is contained in:
alrevuelta 2022-11-24 13:54:47 +00:00
parent 21ce287b04
commit 6950dcb0bc
16 changed files with 571 additions and 264 deletions

View File

@ -78,6 +78,11 @@ type
defaultValue: 50 defaultValue: 50
name: "max-connections" }: uint16 name: "max-connections" }: uint16
peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore."
defaultValue: 100
name: "peer-store-capacity" }: int
peerPersistence* {. peerPersistence* {.
desc: "Enable peer persistence.", desc: "Enable peer persistence.",
defaultValue: false, defaultValue: false,

View File

@ -273,7 +273,8 @@ proc initNode(conf: WakuNodeConf,
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
dns4DomainName, dns4DomainName,
discv5UdpPort, discv5UdpPort,
some(conf.agentString) some(conf.agentString),
some(conf.peerStoreCapacity),
) )
except: except:
return err("failed to create waku node instance: " & getCurrentExceptionMsg()) return err("failed to create waku node instance: " & getCurrentExceptionMsg())

View File

@ -37,7 +37,7 @@ proc setupAndPublish() {.async.} =
ip = ValidIpAddress.init("0.0.0.0") ip = ValidIpAddress.init("0.0.0.0")
node = WakuNode.new(nodeKey, ip, Port(wakuPort)) node = WakuNode.new(nodeKey, ip, Port(wakuPort))
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true) flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)
# assumes behind a firewall, so not care about being discoverable # assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new( node.wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress), extIp= none(ValidIpAddress),
@ -59,7 +59,7 @@ proc setupAndPublish() {.async.} =
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped # wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true: while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
if numConnectedPeers >= 6: if numConnectedPeers >= 6:
notice "publisher is ready", connectedPeers=numConnectedPeers, required=6 notice "publisher is ready", connectedPeers=numConnectedPeers, required=6
break break
@ -85,4 +85,4 @@ proc setupAndPublish() {.async.} =
await sleepAsync(5000) await sleepAsync(5000)
asyncSpawn setupAndPublish() asyncSpawn setupAndPublish()
runForever() runForever()

View File

@ -55,7 +55,7 @@ proc setupAndSubscribe() {.async.} =
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped # wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true: while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
if numConnectedPeers >= 6: if numConnectedPeers >= 6:
notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6 notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6
break break
@ -81,4 +81,4 @@ proc setupAndSubscribe() {.async.} =
asyncSpawn setupAndSubscribe() asyncSpawn setupAndSubscribe()
runForever() runForever()

View File

@ -5,6 +5,7 @@ import
stew/shims/net as stewNet, stew/shims/net as stewNet,
testutils/unittests, testutils/unittests,
chronicles, chronicles,
chronos,
json_rpc/rpcserver, json_rpc/rpcserver,
json_rpc/rpcclient, json_rpc/rpcclient,
eth/keys, eth/keys,
@ -24,7 +25,8 @@ import
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_swap/waku_swap,
../test_helpers ../test_helpers,
./testlib/testutils
procSuite "Peer Manager": procSuite "Peer Manager":
asyncTest "Peer dialing works": asyncTest "Peer dialing works":
@ -34,7 +36,7 @@ procSuite "Peer Manager":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802)) node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802))
peerInfo2 = node2.switch.peerInfo peerInfo2 = node2.switch.peerInfo
await allFutures([node1.start(), node2.start()]) await allFutures([node1.start(), node2.start()])
await node1.mountRelay() await node1.mountRelay()
@ -47,17 +49,17 @@ procSuite "Peer Manager":
check: check:
conn.activity conn.activity
conn.peerId == peerInfo2.peerId conn.peerId == peerInfo2.peerId
# Check that node2 is being managed in node1 # Check that node2 is being managed in node1
check: check:
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
# Check connectedness # Check connectedness
check: check:
node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connectedness.Connected
await allFutures([node1.stop(), node2.stop()]) await allFutures([node1.stop(), node2.stop()])
asyncTest "Dialing fails gracefully": asyncTest "Dialing fails gracefully":
let let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
@ -65,7 +67,7 @@ procSuite "Peer Manager":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812)) node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812))
peerInfo2 = node2.switch.peerInfo peerInfo2 = node2.switch.peerInfo
await node1.start() await node1.start()
# Purposefully don't start node2 # Purposefully don't start node2
@ -78,7 +80,7 @@ procSuite "Peer Manager":
# Check connection failed gracefully # Check connection failed gracefully
check: check:
connOpt.isNone() connOpt.isNone()
await node1.stop() await node1.stop()
asyncTest "Adding, selecting and filtering peers work": asyncTest "Adding, selecting and filtering peers work":
@ -97,7 +99,7 @@ procSuite "Peer Manager":
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet() storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get() storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.new(storeKey, @[storeLoc]) storePeer = PeerInfo.new(storeKey, @[storeLoc])
await node.start() await node.start()
await node.mountFilterClient() await node.mountFilterClient()
@ -105,25 +107,25 @@ procSuite "Peer Manager":
node.mountStoreClient() node.mountStoreClient()
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo()) node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo()) node.setFilterPeer(filterPeer.toRemotePeerInfo())
# Check peers were successfully added to peer manager # Check peers were successfully added to peer manager
check: check:
node.peerManager.peers().len == 3 node.peerManager.peerStore.peers().len == 3
node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and it.addrs.contains(filterLoc) and
it.protos.contains(WakuFilterCodec)) it.protos.contains(WakuFilterCodec))
node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
it.addrs.contains(swapLoc) and it.addrs.contains(swapLoc) and
it.protos.contains(WakuSwapCodec)) it.protos.contains(WakuSwapCodec))
node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and it.addrs.contains(storeLoc) and
it.protos.contains(WakuStoreCodec)) it.protos.contains(WakuStoreCodec))
await node.stop() await node.stop()
asyncTest "Peer manager keeps track of connections": asyncTest "Peer manager keeps track of connections":
let let
@ -132,7 +134,7 @@ procSuite "Peer Manager":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832)) node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832))
peerInfo2 = node2.switch.peerInfo peerInfo2 = node2.switch.peerInfo
await node1.start() await node1.start()
await node1.mountRelay() await node1.mountRelay()
@ -142,28 +144,28 @@ procSuite "Peer Manager":
node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec) node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)
check: check:
# No information about node2's connectedness # No information about node2's connectedness
node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
# Purposefully don't start node2 # Purposefully don't start node2
# Attempt dialing node2 from node1 # Attempt dialing node2 from node1
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check: check:
# Cannot connect to node2 # Cannot connect to node2
node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CannotConnect
# Successful connection # Successful connection
await node2.start() await node2.start()
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check: check:
# Currently connected to node2 # Currently connected to node2
node1.peerManager.connectedness(peerInfo2.peerId) == Connected node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Stop node. Gracefully disconnect from all peers. # Stop node. Gracefully disconnect from all peers.
await node1.stop() await node1.stop()
check: check:
# Not currently connected to node2, but had recent, successful connection. # Not currently connected to node2, but had recent, successful connection.
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CanConnect
await node2.stop() await node2.stop()
asyncTest "Peer manager can use persistent storage and survive restarts": asyncTest "Peer manager can use persistent storage and survive restarts":
@ -173,9 +175,9 @@ procSuite "Peer Manager":
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage) node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842)) node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842))
peerInfo2 = node2.switch.peerInfo peerInfo2 = node2.switch.peerInfo
await node1.start() await node1.start()
await node2.start() await node2.start()
@ -185,33 +187,34 @@ procSuite "Peer Manager":
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check: check:
# Currently connected to node2 # Currently connected to node2
node1.peerManager.peers().len == 1 node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.connectedness(peerInfo2.peerId) == Connected node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage # Simulate restart by initialising a new node using the same storage
let let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage) node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage)
await node3.start() await node3.start()
check: check:
# Node2 has been loaded after "restart", but we have not yet reconnected # Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peers().len == 1 node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay() # This should trigger a reconnect await node3.mountRelay() # This should trigger a reconnect
check: check:
# Reconnected to node2 after "restart" # Reconnected to node2 after "restart"
node3.peerManager.peers().len == 1 node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.connectedness(peerInfo2.peerId) == Connected node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()]) await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": # TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let let
database = SqliteDatabase.new(":memory:")[] database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[] storage = WakuPeerStorage.new(database)[]
@ -222,7 +225,7 @@ procSuite "Peer Manager":
peerInfo2 = node2.switch.peerInfo peerInfo2 = node2.switch.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2" betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0" stableCodec = "/vac/waku/relay/2.0.0"
await node1.start() await node1.start()
await node2.start() await node2.start()
@ -234,16 +237,16 @@ procSuite "Peer Manager":
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds) discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
check: check:
# Currently connected to node2 # Currently connected to node2
node1.peerManager.peers().len == 1 node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec)) node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
node1.peerManager.connectedness(peerInfo2.peerId) == Connected node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage # Simulate restart by initialising a new node using the same storage
let let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage) node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage)
await node3.mountRelay() await node3.mountRelay()
node3.wakuRelay.codec = stableCodec node3.wakuRelay.codec = stableCodec
check: check:
@ -251,19 +254,63 @@ procSuite "Peer Manager":
node2.wakuRelay.codec == betaCodec node2.wakuRelay.codec == betaCodec
node3.wakuRelay.codec == stableCodec node3.wakuRelay.codec == stableCodec
# Node2 has been loaded after "restart", but we have not yet reconnected # Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peers().len == 1 node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec)) node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.start() # This should trigger a reconnect await node3.start() # This should trigger a reconnect
check: check:
# Reconnected to node2 after "restart" # Reconnected to node2 after "restart"
node3.peerManager.peers().len == 1 node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec)) node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peers().anyIt(it.protos.contains(stableCodec)) node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec))
node3.peerManager.connectedness(peerInfo2.peerId) == Connected node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()]) await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
var nodes: seq[WakuNode]
for i in 0..<4:
let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i))
nodes &= node
# Start them
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
# Attempt to connect to all known peers supporting a given protocol
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
# All peer ids are correct
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId)
# All peers support the relay protocol
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec)
# All peers are connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected
await allFutures(nodes.mapIt(it.stop()))

View File

@ -0,0 +1,256 @@
{.used.}
import
std/[options,sequtils],
libp2p/crypto/crypto,
libp2p/peerstore,
libp2p/multiaddress,
testutils/unittests
import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/peer_manager/waku_peer_store,
../../waku/v2/node/waku_node,
../test_helpers,
./testlib/testutils
suite "Extended nim-libp2p Peer Store":
# Valid peerId missing the last digit. Useful for creating new peerIds
# basePeerId & "1"
# basePeerId & "2"
let basePeerId = "QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW"
setup:
# Setup a nim-libp2p peerstore with some peers
let peerStore = PeerStore.new(capacity = 50)
var p1, p2, p3, p4, p5, p6: PeerId
# create five peers basePeerId + [1-5]
require p1.init(basePeerId & "1")
require p2.init(basePeerId & "2")
require p3.init(basePeerId & "3")
require p4.init(basePeerId & "4")
require p5.init(basePeerId & "5")
# peer6 is not part of the peerstore
require p6.init(basePeerId & "6")
# Peer1: Connected
peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
peerStore[ProtoBook][p1] = @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
peerStore[KeyBook][p1] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p1] = "nwaku"
peerStore[ProtoVersionBook][p1] = "protoVersion1"
peerStore[ConnectionBook][p1] = Connected
peerStore[DisconnectBook][p1] = 0
peerStore[SourceBook][p1] = Discv5
# Peer2: Connected
peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()]
peerStore[ProtoBook][p2] = @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
peerStore[KeyBook][p2] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p2] = "nwaku"
peerStore[ProtoVersionBook][p2] = "protoVersion2"
peerStore[ConnectionBook][p2] = Connected
peerStore[DisconnectBook][p2] = 0
peerStore[SourceBook][p2] = Discv5
# Peer3: Connected
peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
peerStore[ProtoBook][p3] = @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
peerStore[KeyBook][p3] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p3] = "gowaku"
peerStore[ProtoVersionBook][p3] = "protoVersion3"
peerStore[ConnectionBook][p3] = Connected
peerStore[DisconnectBook][p3] = 0
peerStore[SourceBook][p3] = Discv5
# Peer4: Added but never connected
peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()]
# unknown: peerStore[ProtoBook][p4]
peerStore[KeyBook][p4] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
# unknown: peerStore[AgentBook][p4]
# unknown: peerStore[ProtoVersionBook][p4]
peerStore[ConnectionBook][p4] = NotConnected
peerStore[DisconnectBook][p4] = 0
peerStore[SourceBook][p4] = Discv5
# Peer5: Connecteed in the past
peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()]
peerStore[ProtoBook][p5] = @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
peerStore[KeyBook][p5] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p5] = "gowaku"
peerStore[ProtoVersionBook][p5] = "protoVersion5"
peerStore[ConnectionBook][p5] = CanConnect
peerStore[DisconnectBook][p5] = 1000
peerStore[SourceBook][p5] = Discv5
test "get() returns the correct StoredInfo for a given PeerId":
# When
let storedInfoPeer1 = peerStore.get(p1)
let storedInfoPeer6 = peerStore.get(p6)
# Then
check:
# regression on nim-libp2p fields
storedInfoPeer1.peerId == p1
storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
storedInfoPeer1.agent == "nwaku"
storedInfoPeer1.protoVersion == "protoVersion1"
# our extended fields
storedInfoPeer1.connectedness == Connected
storedInfoPeer1.disconnectTime == 0
storedInfoPeer1.origin == Discv5
check:
# fields are empty
storedInfoPeer6.peerId == p6
storedInfoPeer6.addrs.len == 0
storedInfoPeer6.protos.len == 0
storedInfoPeer6.agent == ""
storedInfoPeer6.protoVersion == ""
storedInfoPeer6.connectedness == NotConnected
storedInfoPeer6.disconnectTime == 0
storedInfoPeer6.origin == Unknown
test "peers() returns all StoredInfo of the PeerStore":
# When
let allPeers = peerStore.peers()
# Then
check:
allPeers.len == 5
allPeers.anyIt(it.peerId == p1)
allPeers.anyIt(it.peerId == p2)
allPeers.anyIt(it.peerId == p3)
allPeers.anyIt(it.peerId == p4)
allPeers.anyIt(it.peerId == p5)
let p3 = allPeers.filterIt(it.peerId == p3)[0]
check:
# regression on nim-libp2p fields
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
p3.agent == "gowaku"
p3.protoVersion == "protoVersion3"
# our extended fields
p3.connectedness == Connected
p3.disconnectTime == 0
p3.origin == Discv5
test "peers() returns all StoredInfo matching a specific protocol":
# When
let storePeers = peerStore.peers("/vac/waku/store/2.0.0")
let lpPeers = peerStore.peers("/vac/waku/lightpush/2.0.0")
# Then
check:
# Only p1 and p2 support that protocol
storePeers.len == 2
storePeers.anyIt(it.peerId == p1)
storePeers.anyIt(it.peerId == p2)
check:
# Only p3 supports that protocol
lpPeers.len == 1
lpPeers.anyIt(it.peerId == p3)
lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
test "peers() returns all StoredInfo matching a given protocolMatcher":
# When
let pMatcherStorePeers = peerStore.peers(protocolMatcher("/vac/waku/store/2.0.0"))
let pMatcherSwapPeers = peerStore.peers(protocolMatcher("/vac/waku/swap/2.0.0"))
# Then
check:
# peers: 1,2,3,5 match /vac/waku/store/2.0.0/xxx
pMatcherStorePeers.len == 4
pMatcherStorePeers.anyIt(it.peerId == p1)
pMatcherStorePeers.anyIt(it.peerId == p2)
pMatcherStorePeers.anyIt(it.peerId == p3)
pMatcherStorePeers.anyIt(it.peerId == p5)
check:
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
check:
pMatcherSwapPeers.len == 1
pMatcherSwapPeers.anyIt(it.peerId == p5)
pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
# Given
let storedInfoPeer1 = peerStore.get(p1)
# When
let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo()
# Then
check:
remotePeerInfo1.peerId == p1
remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
test "connectedness() returns the connection status of a given PeerId":
check:
# peers tracked in the peerstore
peerStore.connectedness(p1) == Connected
peerStore.connectedness(p2) == Connected
peerStore.connectedness(p3) == Connected
peerStore.connectedness(p4) == NotConnected
peerStore.connectedness(p5) == CanConnect
# peer not tracked in the peerstore
peerStore.connectedness(p6) == NotConnected
test "hasPeer() returns true if the peer supports a given protocol":
check:
peerStore.hasPeer(p1, "/vac/waku/relay/2.0.0-beta1")
peerStore.hasPeer(p1, "/vac/waku/store/2.0.0")
not peerStore.hasPeer(p1, "it-does-not-contain-this-protocol")
peerStore.hasPeer(p2, "/vac/waku/relay/2.0.0")
peerStore.hasPeer(p2, "/vac/waku/store/2.0.0")
peerStore.hasPeer(p3, "/vac/waku/lightpush/2.0.0")
peerStore.hasPeer(p3, "/vac/waku/store/2.0.0-beta1")
# we have no knowledge of p4 supported protocols
not peerStore.hasPeer(p4, "/vac/waku/lightpush/2.0.0")
peerStore.hasPeer(p5, "/vac/waku/swap/2.0.0")
peerStore.hasPeer(p5, "/vac/waku/store/2.0.0-beta2")
not peerStore.hasPeer(p5, "another-protocol-not-contained")
# peer 6 is not in the PeerStore
not peerStore.hasPeer(p6, "/vac/waku/lightpush/2.0.0")
test "hasPeers() returns true if any peer in the PeerStore supports a given protocol":
# Match specific protocols
check:
peerStore.hasPeers("/vac/waku/relay/2.0.0-beta1")
peerStore.hasPeers("/vac/waku/store/2.0.0")
peerStore.hasPeers("/vac/waku/lightpush/2.0.0")
not peerStore.hasPeers("/vac/waku/does-not-exist/2.0.0")
# Match protocolMatcher protocols
check:
peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0"))
not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0"))
test "selectPeer() returns if a peer supports a given protocol":
# When
let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0")
# Then
check:
swapPeer.isSome()
swapPeer.get().peerId == p5
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

View File

@ -88,11 +88,11 @@ procSuite "Waku DNS Discovery":
check: check:
# We have successfully connected to all discovered nodes # We have successfully connected to all discovered nodes
node4.peerManager.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId) node4.peerManager.peerStore.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId)
node4.peerManager.connectedness(node1.switch.peerInfo.peerId) == Connected node4.peerManager.peerStore.connectedness(node1.switch.peerInfo.peerId) == Connected
node4.peerManager.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId) node4.peerManager.peerStore.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId)
node4.peerManager.connectedness(node2.switch.peerInfo.peerId) == Connected node4.peerManager.peerStore.connectedness(node2.switch.peerInfo.peerId) == Connected
node4.peerManager.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId) node4.peerManager.peerStore.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId)
node4.peerManager.connectedness(node3.switch.peerInfo.peerId) == Connected node4.peerManager.peerStore.connectedness(node3.switch.peerInfo.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()]) await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()])

View File

@ -101,7 +101,7 @@ proc main(): Future[int] {.async.} =
# ensure input protocols are valid # ensure input protocols are valid
for p in conf.protocols: for p in conf.protocols:
if p notin ProtocolsTable: if p notin ProtocolsTable:
error "invalid protocol", protocol=p, valid=ProtocolsTable error "invalid protocol", protocol=p, valid=ProtocolsTable
raise newException(ConfigurationError, "Invalid cli flag values" & p) raise newException(ConfigurationError, "Invalid cli flag values" & p)
@ -128,7 +128,7 @@ proc main(): Future[int] {.async.} =
error "Timedout after", timeout=conf.timeout error "Timedout after", timeout=conf.timeout
let lp2pPeerStore = node.switch.peerStore let lp2pPeerStore = node.switch.peerStore
let conStatus = node.peerManager.peerStore.connectionBook[peer.peerId] let conStatus = node.peerManager.peerStore[ConnectionBook][peer.peerId]
if conStatus in [Connected, CanConnect]: if conStatus in [Connected, CanConnect]:
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId] let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId]

View File

@ -39,7 +39,7 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Admin API version 1 definitions ## Admin API version 1 definitions
rpcsrv.rpc("post_waku_v2_admin_v1_peers") do(peers: seq[string]) -> bool: rpcsrv.rpc("post_waku_v2_admin_v1_peers") do(peers: seq[string]) -> bool:
## Connect to a list of peers ## Connect to a list of peers
debug "post_waku_v2_admin_v1_peers" debug "post_waku_v2_admin_v1_peers"
@ -64,34 +64,38 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
if not node.wakuRelay.isNil: if not node.wakuRelay.isNil:
# Map managed peers to WakuPeers and add to return list # Map managed peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuRelayCodec) wPeers.insert(node.peerManager.peerStore
.peers(WakuRelayCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuRelayCodec, protocol: WakuRelayCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence wPeers.len) # Append to the end of the sequence
if not node.wakuFilter.isNil: if not node.wakuFilter.isNil:
# Map WakuFilter peers to WakuPeers and add to return list # Map WakuFilter peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuFilterCodec) wPeers.insert(node.peerManager.peerStore
.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuFilterCodec, protocol: WakuFilterCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence wPeers.len) # Append to the end of the sequence
if not node.wakuSwap.isNil: if not node.wakuSwap.isNil:
# Map WakuSwap peers to WakuPeers and add to return list # Map WakuSwap peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuSwapCodec) wPeers.insert(node.peerManager.peerStore
.peers(WakuSwapCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuSwapCodec, protocol: WakuSwapCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence wPeers.len) # Append to the end of the sequence
if not node.wakuStore.isNil: if not node.wakuStore.isNil:
# Map WakuStore peers to WakuPeers and add to return list # Map WakuStore peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuStoreCodec) wPeers.insert(node.peerManager.peerStore
.peers(WakuStoreCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), .mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuStoreCodec, protocol: WakuStoreCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)), connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence wPeers.len) # Append to the end of the sequence
# @TODO filter output on protocol/connected-status # @TODO filter output on protocol/connected-status

View File

@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Returns history for a list of content topics with optional paging ## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages" debug "get_waku_v2_store_v1_messages"
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
if peerOpt.isNone(): if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers") raise newException(ValueError, "no suitable remote store peers")

View File

@ -6,8 +6,8 @@ else:
import import
std/[options, sets, sequtils, times], std/[options, sets, sequtils, times],
chronos, chronos,
chronicles, chronicles,
metrics, metrics,
libp2p/multistream libp2p/multistream
import import
@ -29,7 +29,7 @@ logScope:
type type
PeerManager* = ref object of RootObj PeerManager* = ref object of RootObj
switch*: Switch switch*: Switch
peerStore*: WakuPeerStore peerStore*: PeerStore
storage: PeerStorage storage: PeerStorage
let let
@ -39,11 +39,6 @@ let
# Helper functions # # Helper functions #
#################### ####################
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
RemotePeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
proc insertOrReplace(ps: PeerStorage, proc insertOrReplace(ps: PeerStorage,
peerId: PeerID, peerId: PeerID,
storedInfo: StoredInfo, storedInfo: StoredInfo,
@ -55,7 +50,7 @@ proc insertOrReplace(ps: PeerStorage,
warn "failed to store peers", err = res.error warn "failed to store peers", err = res.error
waku_peers_errors.inc(labelValues = ["storage_failure"]) waku_peers_errors.inc(labelValues = ["storage_failure"])
proc dialPeer(pm: PeerManager, peerId: PeerID, proc dialPeer(pm: PeerManager, peerId: PeerID,
addrs: seq[MultiAddress], proto: string, addrs: seq[MultiAddress], proto: string,
dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
info "Dialing peer from manager", wireAddr = addrs, peerId = peerId info "Dialing peer from manager", wireAddr = addrs, peerId = peerId
@ -73,20 +68,20 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
debug "Dialing remote peer timed out" debug "Dialing remote peer timed out"
waku_peers_dials.inc(labelValues = ["timeout"]) waku_peers_dials.inc(labelValues = ["timeout"])
pm.peerStore.connectionBook[peerId] = CannotConnect pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.storage.isNil: if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
return none(Connection) return none(Connection)
except CatchableError as e: except CatchableError as e:
# TODO: any redial attempts? # TODO: any redial attempts?
debug "Dialing remote peer failed", msg = e.msg debug "Dialing remote peer failed", msg = e.msg
waku_peers_dials.inc(labelValues = ["failed"]) waku_peers_dials.inc(labelValues = ["failed"])
pm.peerStore.connectionBook[peerId] = CannotConnect pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.storage.isNil: if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
return none(Connection) return none(Connection)
proc loadFromStorage(pm: PeerManager) = proc loadFromStorage(pm: PeerManager) =
@ -99,49 +94,55 @@ proc loadFromStorage(pm: PeerManager) =
# Do not manage self # Do not manage self
return return
pm.peerStore.addressBook[peerId] = storedInfo.addrs # nim-libp2p books
pm.peerStore.protoBook[peerId] = storedInfo.protos pm.peerStore[AddressBook][peerId] = storedInfo.addrs
pm.peerStore.keyBook[peerId] = storedInfo.publicKey pm.peerStore[ProtoBook][peerId] = storedInfo.protos
pm.peerStore.connectionBook[peerId] = NotConnected # Reset connectedness state pm.peerStore[KeyBook][peerId] = storedInfo.publicKey
pm.peerStore.disconnectBook[peerId] = disconnectTime pm.peerStore[AgentBook][peerId] = storedInfo.agent
pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion
# custom books
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
pm.peerStore[DisconnectBook][peerId] = disconnectTime
pm.peerStore[SourceBook][peerId] = storedInfo.origin
let res = pm.storage.getAll(onData) let res = pm.storage.getAll(onData)
if res.isErr: if res.isErr:
warn "failed to load peers from storage", err = res.error warn "failed to load peers from storage", err = res.error
waku_peers_errors.inc(labelValues = ["storage_load_failure"]) waku_peers_errors.inc(labelValues = ["storage_load_failure"])
else: else:
debug "successfully queried peer storage" debug "successfully queried peer storage"
################## ##################
# Initialisation # # Initialisation #
################## ##################
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
if not pm.peerStore.addressBook.contains(peerId): if not pm.peerStore[AddressBook].contains(peerId):
## We only consider connection events if we ## We only consider connection events if we
## already track some addresses for this peer ## already track some addresses for this peer
return return
case event.kind case event.kind
of ConnEventKind.Connected: of ConnEventKind.Connected:
pm.peerStore.connectionBook[peerId] = Connected pm.peerStore[ConnectionBook][peerId] = Connected
if not pm.storage.isNil: if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected) pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
return return
of ConnEventKind.Disconnected: of ConnEventKind.Disconnected:
pm.peerStore.connectionBook[peerId] = CanConnect pm.peerStore[ConnectionBook][peerId] = CanConnect
if not pm.storage.isNil: if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
return return
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager = proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
let pm = PeerManager(switch: switch, let pm = PeerManager(switch: switch,
peerStore: WakuPeerStore.new(), peerStore: switch.peerStore,
storage: storage) storage: storage)
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event) onConnEvent(pm, peerId, event)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
@ -150,122 +151,68 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
pm.loadFromStorage() # Load previously managed peers. pm.loadFromStorage() # Load previously managed peers.
else: else:
debug "no peer storage found" debug "no peer storage found"
return pm return pm
##################### #####################
# Manager interface # # Manager interface #
##################### #####################
proc peers*(pm: PeerManager): seq[StoredInfo] =
# Return the known info for all peers
pm.peerStore.peers()
proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
# Return the known info for all peers registered on the specified protocol
pm.peers.filterIt(it.protos.contains(proto))
proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
# Return the known info for all peers matching the provided protocolMatcher
pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness =
# Return the connection state of the given, managed peer
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId)
if (storedInfo == StoredInfo()):
# Peer is not managed, therefore not connected
return NotConnected
else:
pm.peerStore.connectionBook[peerId]
proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
pm.peerStore.get(peerId).protos.contains(proto)
proc hasPeers*(pm: PeerManager, proto: string): bool =
# Returns `true` if manager has any peers for the specified protocol
pm.peers.anyIt(it.protos.contains(proto))
proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool =
# Returns `true` if manager has any peers matching the protocolMatcher
pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Adds peer to manager for the specified protocol # Adds peer to manager for the specified protocol
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to manage our unmanageable self # Do not attempt to manage our unmanageable self
return return
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
# ...known addresses # ...known addresses
for multiaddr in remotePeerInfo.addrs: for multiaddr in remotePeerInfo.addrs:
pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr) pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr
# ...public key # ...public key
var publicKey: PublicKey var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey) discard remotePeerInfo.peerId.extractPublicKey(publicKey)
pm.peerStore.keyBook[remotePeerInfo.peerId] = publicKey pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
# ...associated protocols # nim-libp2p identify overrides this
pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto) pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto
# Add peer to storage. Entry will subsequently be updated with connectedness information # Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil: if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected) pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
let peers = pm.peers.filterIt(it.protos.contains(proto))
if peers.len >= 1:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toRemotePeerInfo())
else:
return none(RemotePeerInfo)
proc reconnectPeers*(pm: PeerManager, proc reconnectPeers*(pm: PeerManager,
proto: string, proto: string,
protocolMatcher: Matcher, protocolMatcher: Matcher,
backoff: chronos.Duration = chronos.seconds(0)) {.async.} = backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness. ## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart. ## Especially useful to resume connections from persistent storage after a restart.
debug "Reconnecting peers", proto=proto debug "Reconnecting peers", proto=proto
for storedInfo in pm.peers(protocolMatcher): for storedInfo in pm.peerStore.peers(protocolMatcher):
# Check if peer is reachable. # Check that the peer can be connected
if pm.peerStore.connectionBook[storedInfo.peerId] == CannotConnect: if storedInfo.connectedness == CannotConnect:
debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
continue continue
# Respect optional backoff period where applicable. # Respect optional backoff period where applicable.
let let
disconnectTime = Moment.init(pm.peerStore.disconnectBook[storedInfo.peerId], Second) # Convert # TODO: Add method to peerStore (eg isBackoffExpired())
disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
if backoffTime > ZeroDuration: if backoffTime > ZeroDuration:
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting # We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime) await sleepAsync(backoffTime)
# Add to protos for peer, if it has not been added yet
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
let remotePeerInfo = storedInfo.toRemotePeerInfo()
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo, proto)
trace "Reconnecting to peer", peerId=storedInfo.peerId trace "Reconnecting to peer", peerId=storedInfo.peerId
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
@ -277,12 +224,12 @@ proc reconnectPeers*(pm: PeerManager,
proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers # Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed. # TODO: check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet... # First add dialed peer info to peer store, if it does not exist yet...
if not pm.hasPeer(remotePeerInfo.peerId, proto): if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo, proto) pm.addPeer(remotePeerInfo, proto)
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self # Do not attempt to dial self
return none(Connection) return none(Connection)
@ -292,7 +239,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d
proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore # Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# TODO: check peer validity and score before continuing. Limit number of peers to be managed. # TODO: check peer validity and score before continuing. Limit number of peers to be managed.
if peerId == pm.switch.peerInfo.peerId: if peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self # Do not attempt to dial self
return none(Connection) return none(Connection)
@ -304,10 +251,10 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = def
proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} = proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc) ## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "Connecting to node", remotePeer = remotePeer, source = source info "Connecting to node", remotePeer = remotePeer, source = source
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
let connOpt = await pm.dialPeer(remotePeer, proto) let connOpt = await pm.dialPeer(remotePeer, proto)
if connOpt.isSome(): if connOpt.isSome():
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
waku_node_conns_initiated.inc(labelValues = [source]) waku_node_conns_initiated.inc(labelValues = [source])
@ -318,7 +265,7 @@ proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, s
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} = proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc) ## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len info "connectToNodes", len = nodes.len
for nodeId in nodes: for nodeId in nodes:
await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source) await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source)
@ -333,7 +280,7 @@ proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source
proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} = proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc) ## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len info "connectToNodes", len = nodes.len
for remotePeerInfo in nodes: for remotePeerInfo in nodes:
await connectToNode(pm, remotePeerInfo, proto, source) await connectToNode(pm, remotePeerInfo, proto, source)

View File

@ -4,12 +4,17 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/[tables, sequtils, sets], std/[tables, sequtils, sets, options],
libp2p/builders, libp2p/builders,
libp2p/peerstore libp2p/peerstore
import
../../utils/peers
export peerstore, builders export peerstore, builders
# TODO rename to peer_store_extended to emphasize its a nimlibp2 extension
type type
Connectedness* = enum Connectedness* = enum
# NotConnected: default state for a new peer. No connection and no further information on connectedness. # NotConnected: default state for a new peer. No connection and no further information on connectedness.
@ -20,74 +25,107 @@ type
CanConnect, CanConnect,
# Connected: actively connected to peer. # Connected: actively connected to peer.
Connected Connected
PeerOrigin* = enum
Unknown,
Discv5,
Static,
Dns
# Keeps track of the Connectedness state of a peer
ConnectionBook* = ref object of PeerBook[Connectedness] ConnectionBook* = ref object of PeerBook[Connectedness]
DisconnectBook* = ref object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps # Keeps track of when peers were disconnected in Unix timestamps
DisconnectBook* = ref object of PeerBook[int64]
# Keeps track of the origin of a peer
SourceBook* = ref object of PeerBook[PeerOrigin]
WakuPeerStore* = ref object
addressBook*: AddressBook
protoBook*: ProtoBook
keyBook*: KeyBook
connectionBook*: ConnectionBook
disconnectBook*: DisconnectBook
StoredInfo* = object StoredInfo* = object
# Collates stored info about a peer # Taken from nim-libp2
peerId*: PeerID peerId*: PeerId
addrs*: seq[MultiAddress] addrs*: seq[MultiAddress]
protos*: seq[string] protos*: seq[string]
publicKey*: PublicKey publicKey*: PublicKey
agent*: string
protoVersion*: string
proc new*(T: type WakuPeerStore): WakuPeerStore = # Extended custom fields
let connectedness*: Connectedness
addressBook = AddressBook(book: initTable[PeerID, seq[MultiAddress]]()) disconnectTime*: int64
protoBook = ProtoBook(book: initTable[PeerID, seq[string]]()) origin*: PeerOrigin
keyBook = KeyBook(book: initTable[PeerID, PublicKey]())
connectionBook = ConnectionBook(book: initTable[PeerID, Connectedness]())
disconnectBook = DisconnectBook(book: initTable[PeerID, int64]())
T(addressBook: addressBook,
protoBook: protoBook,
keyBook: keyBook,
connectionBook: connectionBook,
disconnectBook: disconnectBook)
##################### ##################
# Utility functions #
#####################
proc add*[T](peerBook: SeqPeerBook[T],
peerId: PeerId,
entry: T) =
## Add entry to a given peer. If the peer is not known,
## it will be set with the provided entry.
peerBook.book.mgetOrPut(peerId,
newSeq[T]()).add(entry)
# TODO: Notify clients?
##################
# Peer Store API # # Peer Store API #
################## ##################
proc get*(peerStore: WakuPeerStore, proc get*(peerStore: PeerStore,
peerId: PeerID): StoredInfo = peerId: PeerID): StoredInfo =
## Get the stored information of a given peer. ## Get the stored information of a given peer.
StoredInfo( StoredInfo(
# Taken from nim-libp2
peerId: peerId, peerId: peerId,
addrs: peerStore.addressBook[peerId], addrs: peerStore[AddressBook][peerId],
protos: peerStore.protoBook[peerId], protos: peerStore[ProtoBook][peerId],
publicKey: peerStore.keyBook[peerId] publicKey: peerStore[KeyBook][peerId],
agent: peerStore[AgentBook][peerId],
protoVersion: peerStore[ProtoVersionBook][peerId],
# Extended custom fields
connectedness: peerStore[ConnectionBook][peerId],
disconnectTime: peerStore[DisconnectBook][peerId],
origin: peerStore[SourceBook][peerId],
) )
proc peers*(peerStore: WakuPeerStore): seq[StoredInfo] = proc peers*(peerStore: PeerStore): seq[StoredInfo] =
## Get all the stored information of every peer. ## Get all the stored information of every peer.
let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()),
let allKeys = concat(toSeq(keys(peerStore.addressBook.book)), toSeq(peerStore[ProtoBook].book.keys()),
toSeq(keys(peerStore.protoBook.book)), toSeq(peerStore[KeyBook].book.keys())).toHashSet()
toSeq(keys(peerStore.keyBook.book))).toHashSet()
return allKeys.mapIt(peerStore.get(it)) return allKeys.mapIt(peerStore.get(it))
proc peers*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
# Return the known info for all peers registered on the specified protocol
peerStore.peers.filterIt(it.protos.contains(proto))
proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[StoredInfo] =
# Return the known info for all peers matching the provided protocolMatcher
peerStore.peers.filterIt(it.protos.anyIt(protocolMatcher(it)))
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
RemotePeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
# Return the connection state of the given, managed peer
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
# TODO: What if peer does not exist in the peerStore?
peerStore.get(peerId).protos.contains(proto)
proc hasPeers*(peerStore: PeerStore, proto: string): bool =
# Returns `true` if the peerstore has any peer for the specified protocol
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto))
proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
# Returns `true` if the peerstore has any peer matching the protocolMatcher
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))
proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
let peers = peerStore.peers().filterIt(it.protos.contains(proto))
if peers.len >= 1:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toRemotePeerInfo())
else:
return none(RemotePeerInfo)

View File

@ -102,7 +102,7 @@ type
started*: bool # Indicates that node has started listening started*: bool # Indicates that node has started listening
proc protocolMatcher(codec: string): Matcher = proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec ## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} = proc match(proto: string): bool {.gcsafe.} =
## Matches a proto with any postfix to the provided codec. ## Matches a proto with any postfix to the provided codec.
@ -146,7 +146,8 @@ proc new*(T: type WakuNode,
sendSignedPeerRecord = false, sendSignedPeerRecord = false,
dns4DomainName = none(string), dns4DomainName = none(string),
discv5UdpPort = none(Port), discv5UdpPort = none(Port),
agentString = none(string), # defaults to nim-libp2p version agentString = none(string), # defaults to nim-libp2p version
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} = ): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
## Creates a Waku Node instance. ## Creates a Waku Node instance.
@ -217,7 +218,8 @@ proc new*(T: type WakuNode,
secureCertPath = secureCert, secureCertPath = secureCert,
nameResolver = nameResolver, nameResolver = nameResolver,
sendSignedPeerRecord = sendSignedPeerRecord, sendSignedPeerRecord = sendSignedPeerRecord,
agentString = agentString agentString = agentString,
peerStoreCapacity = peerStoreCapacity,
) )
let wakuNode = WakuNode( let wakuNode = WakuNode(
@ -357,7 +359,7 @@ proc startRelay*(node: WakuNode) {.async.} =
node.subscribe(topic, none(TopicHandler)) node.subscribe(topic, none(TopicHandler))
# Resume previous relay connections # Resume previous relay connections
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting." info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary # Reconnect to previous relay peers. This will respect a backoff period, if necessary
@ -502,7 +504,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
error "cannot register filter subscription to topic", error="waku filter client is nil" error "cannot register filter subscription to topic", error="waku filter client is nil"
return return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
if peerOpt.isNone(): if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers" error "cannot register filter subscription to topic", error="no suitable remote peers"
return return
@ -517,7 +519,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
error "cannot unregister filter subscription to content", error="waku filter client is nil" error "cannot unregister filter subscription to content", error="waku filter client is nil"
return return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
if peerOpt.isNone(): if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers" error "cannot register filter subscription to topic", error="no suitable remote peers"
return return
@ -675,7 +677,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
if node.wakuStoreClient.isNil(): if node.wakuStoreClient.isNil():
return err("waku store client is nil") return err("waku store client is nil")
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec) let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
if peerOpt.isNone(): if peerOpt.isNone():
error "no suitable remote peers" error "no suitable remote peers"
return err("peer_not_found_failure") return err("peer_not_found_failure")
@ -764,7 +766,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
error "failed to publish message", error="waku lightpush client is nil" error "failed to publish message", error="waku lightpush client is nil"
return return
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) let peerOpt = node.peerManager.peerStore.selectPeer(WakuLightPushCodec)
if peerOpt.isNone(): if peerOpt.isNone():
error "failed to publish message", error="no suitable remote peers" error "failed to publish message", error="no suitable remote peers"
return return
@ -824,14 +826,15 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.libp2pPing) node.switch.mount(node.libp2pPing)
# TODO: Move this logic to PeerManager
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started: while node.started:
# Keep all connected peers alive while running # Keep all connected peers alive while running
trace "Running keepalive" trace "Running keepalive"
# First get a list of connected peer infos # First get a list of connected peer infos
let peers = node.peerManager.peers() let peers = node.peerManager.peerStore.peers()
.filterIt(node.peerManager.connectedness(it.peerId) == Connected) .filterIt(it.connectedness == Connected)
.mapIt(it.toRemotePeerInfo()) .mapIt(it.toRemotePeerInfo())
# Attempt to retrieve and ping the active outgoing connection for each peer # Attempt to retrieve and ping the active outgoing connection for each peer
@ -855,6 +858,9 @@ proc startKeepalive*(node: WakuNode) =
asyncSpawn node.keepaliveLoop(defaultKeepalive) asyncSpawn node.keepaliveLoop(defaultKeepalive)
# TODO: Decouple discovery logic from connection logic
# A discovered peer goes to the PeerStore
# The PeerManager uses to PeerStore to dial peers
proc runDiscv5Loop(node: WakuNode) {.async.} = proc runDiscv5Loop(node: WakuNode) {.async.} =
## Continuously add newly discovered nodes ## Continuously add newly discovered nodes
## using Node Discovery v5 ## using Node Discovery v5

View File

@ -43,7 +43,7 @@ proc withWssTransport*(b: SwitchBuilder,
secureKeyPath: string, secureKeyPath: string,
secureCertPath: string): SwitchBuilder secureCertPath: string): SwitchBuilder
{.raises: [Defect, IOError].} = {.raises: [Defect, IOError].} =
let key : TLSPrivateKey = getSecureKey(secureKeyPath) let key : TLSPrivateKey = getSecureKey(secureKeyPath)
let cert : TLSCertificate = getSecureCert(secureCertPath) let cert : TLSCertificate = getSecureCert(secureCertPath)
b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr, b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr,
@ -71,7 +71,8 @@ proc newWakuSwitch*(
wssEnabled: bool = false, wssEnabled: bool = false,
secureKeyPath: string = "", secureKeyPath: string = "",
secureCertPath: string = "", secureCertPath: string = "",
agentString = none(string), # defaults to nim-libp2p version agentString = none(string), # defaults to nim-libp2p version,
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
): Switch ): Switch
{.raises: [Defect, IOError, LPError].} = {.raises: [Defect, IOError, LPError].} =
@ -88,6 +89,8 @@ proc newWakuSwitch*(
.withNameResolver(nameResolver) .withNameResolver(nameResolver)
.withSignedPeerRecord(sendSignedPeerRecord) .withSignedPeerRecord(sendSignedPeerRecord)
if peerStoreCapacity.isSome():
b = b.withPeerStore(peerStoreCapacity.get())
if agentString.isSome(): if agentString.isSome():
b = b.withAgentVersion(agentString.get()) b = b.withAgentVersion(agentString.get())
if privKey.isSome(): if privKey.isSome():
@ -103,4 +106,4 @@ proc newWakuSwitch*(
else : else :
b = b.withAddress(address) b = b.withAddress(address)
b.build() b.build()

View File

@ -77,7 +77,7 @@ proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Fut
return ok() return ok()
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone(): if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure]) waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure) return err(peerNotFoundFailure)
@ -106,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo
return ok() return ok()
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone(): if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure]) waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure) return err(peerNotFoundFailure)

View File

@ -210,7 +210,7 @@ when defined(waku_exp_store_resume):
else: else:
debug "no candidate list is provided, selecting a random peer" debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager # if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) let peerOpt = w.peerManager.peerStore.selectPeer(WakuStoreCodec)
if peerOpt.isNone(): if peerOpt.isNone():
warn "no suitable remote peers" warn "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure]) waku_store_errors.inc(labelValues = [peerNotFoundFailure])