mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
refactor: reuse nim-libp2p peerstore + move peermanager logic (#1383)
* refactor: reuse nim-libp2p peerstore + move peermanager logic * refactor: fix comments * refactor: modify reconnectPeers and unittest * feat(apps): new flag for peerStoreCapacity * fix(examples): fix example2 target * refactor: fix comments
This commit is contained in:
parent
aeda7d5ff6
commit
8eada1927a
@ -78,6 +78,11 @@ type
|
||||
defaultValue: 50
|
||||
name: "max-connections" }: uint16
|
||||
|
||||
peerStoreCapacity* {.
|
||||
desc: "Maximum stored peers in the peerstore."
|
||||
defaultValue: 100
|
||||
name: "peer-store-capacity" }: int
|
||||
|
||||
peerPersistence* {.
|
||||
desc: "Enable peer persistence.",
|
||||
defaultValue: false,
|
||||
|
||||
@ -273,7 +273,8 @@ proc initNode(conf: WakuNodeConf,
|
||||
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
||||
dns4DomainName,
|
||||
discv5UdpPort,
|
||||
some(conf.agentString)
|
||||
some(conf.agentString),
|
||||
some(conf.peerStoreCapacity),
|
||||
)
|
||||
except:
|
||||
return err("failed to create waku node instance: " & getCurrentExceptionMsg())
|
||||
|
||||
@ -59,7 +59,7 @@ proc setupAndPublish() {.async.} =
|
||||
|
||||
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
||||
while true:
|
||||
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
|
||||
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
||||
if numConnectedPeers >= 6:
|
||||
notice "publisher is ready", connectedPeers=numConnectedPeers, required=6
|
||||
break
|
||||
|
||||
@ -55,7 +55,7 @@ proc setupAndSubscribe() {.async.} =
|
||||
|
||||
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
|
||||
while true:
|
||||
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
|
||||
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
|
||||
if numConnectedPeers >= 6:
|
||||
notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6
|
||||
break
|
||||
|
||||
@ -5,6 +5,7 @@ import
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
json_rpc/rpcserver,
|
||||
json_rpc/rpcclient,
|
||||
eth/keys,
|
||||
@ -24,7 +25,8 @@ import
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/protocol/waku_filter,
|
||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||
../test_helpers
|
||||
../test_helpers,
|
||||
./testlib/testutils
|
||||
|
||||
procSuite "Peer Manager":
|
||||
asyncTest "Peer dialing works":
|
||||
@ -50,11 +52,11 @@ procSuite "Peer Manager":
|
||||
|
||||
# Check that node2 is being managed in node1
|
||||
check:
|
||||
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
|
||||
# Check connectedness
|
||||
check:
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connectedness.Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop()])
|
||||
|
||||
@ -111,14 +113,14 @@ procSuite "Peer Manager":
|
||||
|
||||
# Check peers were successfully added to peer manager
|
||||
check:
|
||||
node.peerManager.peers().len == 3
|
||||
node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
|
||||
node.peerManager.peerStore.peers().len == 3
|
||||
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
|
||||
it.addrs.contains(filterLoc) and
|
||||
it.protos.contains(WakuFilterCodec))
|
||||
node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
|
||||
node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
|
||||
it.addrs.contains(swapLoc) and
|
||||
it.protos.contains(WakuSwapCodec))
|
||||
node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
|
||||
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
|
||||
it.addrs.contains(storeLoc) and
|
||||
it.protos.contains(WakuStoreCodec))
|
||||
|
||||
@ -142,27 +144,27 @@ procSuite "Peer Manager":
|
||||
node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)
|
||||
check:
|
||||
# No information about node2's connectedness
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
# Purposefully don't start node2
|
||||
# Attempt dialing node2 from node1
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Cannot connect to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CannotConnect
|
||||
|
||||
# Successful connection
|
||||
await node2.start()
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Stop node. Gracefully disconnect from all peers.
|
||||
await node1.stop()
|
||||
check:
|
||||
# Not currently connected to node2, but had recent, successful connection.
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CanConnect
|
||||
|
||||
await node2.stop()
|
||||
|
||||
@ -185,9 +187,9 @@ procSuite "Peer Manager":
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.peers().len == 1
|
||||
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node1.peerManager.peerStore.peers().len == 1
|
||||
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Simulate restart by initialising a new node using the same storage
|
||||
let
|
||||
@ -197,21 +199,22 @@ procSuite "Peer Manager":
|
||||
await node3.start()
|
||||
check:
|
||||
# Node2 has been loaded after "restart", but we have not yet reconnected
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
await node3.mountRelay() # This should trigger a reconnect
|
||||
|
||||
check:
|
||||
# Reconnected to node2 after "restart"
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||
|
||||
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
|
||||
# TODO: nwaku/issues/1377
|
||||
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
|
||||
let
|
||||
database = SqliteDatabase.new(":memory:")[]
|
||||
storage = WakuPeerStorage.new(database)[]
|
||||
@ -234,10 +237,10 @@ procSuite "Peer Manager":
|
||||
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
|
||||
check:
|
||||
# Currently connected to node2
|
||||
node1.peerManager.peers().len == 1
|
||||
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
|
||||
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node1.peerManager.peerStore.peers().len == 1
|
||||
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
|
||||
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
# Simulate restart by initialising a new node using the same storage
|
||||
let
|
||||
@ -251,19 +254,63 @@ procSuite "Peer Manager":
|
||||
node2.wakuRelay.codec == betaCodec
|
||||
node3.wakuRelay.codec == stableCodec
|
||||
# Node2 has been loaded after "restart", but we have not yet reconnected
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
|
||||
|
||||
await node3.start() # This should trigger a reconnect
|
||||
|
||||
check:
|
||||
# Reconnected to node2 after "restart"
|
||||
node3.peerManager.peers().len == 1
|
||||
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.peers().anyIt(it.protos.contains(stableCodec))
|
||||
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
|
||||
node3.peerManager.peerStore.peers().len == 1
|
||||
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
|
||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
|
||||
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec))
|
||||
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||
|
||||
asyncTest "Peer manager connects to all peers supporting a given protocol":
|
||||
# Create 4 nodes
|
||||
var nodes: seq[WakuNode]
|
||||
for i in 0..<4:
|
||||
let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i))
|
||||
nodes &= node
|
||||
|
||||
# Start them
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||
|
||||
# Get all peer infos
|
||||
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
# Add all peers (but self) to node 0
|
||||
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
|
||||
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
|
||||
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
|
||||
|
||||
# Attempt to connect to all known peers supporting a given protocol
|
||||
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
|
||||
|
||||
check:
|
||||
# Peerstore track all three peers
|
||||
nodes[0].peerManager.peerStore.peers().len == 3
|
||||
|
||||
# All peer ids are correct
|
||||
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId)
|
||||
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId)
|
||||
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId)
|
||||
|
||||
# All peers support the relay protocol
|
||||
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec)
|
||||
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec)
|
||||
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec)
|
||||
|
||||
# All peers are connected
|
||||
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected
|
||||
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected
|
||||
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected
|
||||
|
||||
await allFutures(nodes.mapIt(it.stop()))
|
||||
|
||||
256
tests/v2/test_peer_store_extended.nim
Normal file
256
tests/v2/test_peer_store_extended.nim
Normal file
@ -0,0 +1,256 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options,sequtils],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/peerstore,
|
||||
libp2p/multiaddress,
|
||||
testutils/unittests
|
||||
import
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/node/peer_manager/waku_peer_store,
|
||||
../../waku/v2/node/waku_node,
|
||||
../test_helpers,
|
||||
./testlib/testutils
|
||||
|
||||
|
||||
suite "Extended nim-libp2p Peer Store":
|
||||
# Valid peerId missing the last digit. Useful for creating new peerIds
|
||||
# basePeerId & "1"
|
||||
# basePeerId & "2"
|
||||
let basePeerId = "QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW"
|
||||
|
||||
setup:
|
||||
# Setup a nim-libp2p peerstore with some peers
|
||||
let peerStore = PeerStore.new(capacity = 50)
|
||||
var p1, p2, p3, p4, p5, p6: PeerId
|
||||
|
||||
# create five peers basePeerId + [1-5]
|
||||
require p1.init(basePeerId & "1")
|
||||
require p2.init(basePeerId & "2")
|
||||
require p3.init(basePeerId & "3")
|
||||
require p4.init(basePeerId & "4")
|
||||
require p5.init(basePeerId & "5")
|
||||
|
||||
# peer6 is not part of the peerstore
|
||||
require p6.init(basePeerId & "6")
|
||||
|
||||
# Peer1: Connected
|
||||
peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||
peerStore[ProtoBook][p1] = @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
peerStore[KeyBook][p1] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p1] = "nwaku"
|
||||
peerStore[ProtoVersionBook][p1] = "protoVersion1"
|
||||
peerStore[ConnectionBook][p1] = Connected
|
||||
peerStore[DisconnectBook][p1] = 0
|
||||
peerStore[SourceBook][p1] = Discv5
|
||||
|
||||
# Peer2: Connected
|
||||
peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()]
|
||||
peerStore[ProtoBook][p2] = @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
|
||||
peerStore[KeyBook][p2] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p2] = "nwaku"
|
||||
peerStore[ProtoVersionBook][p2] = "protoVersion2"
|
||||
peerStore[ConnectionBook][p2] = Connected
|
||||
peerStore[DisconnectBook][p2] = 0
|
||||
peerStore[SourceBook][p2] = Discv5
|
||||
|
||||
# Peer3: Connected
|
||||
peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
|
||||
peerStore[ProtoBook][p3] = @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
peerStore[KeyBook][p3] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p3] = "gowaku"
|
||||
peerStore[ProtoVersionBook][p3] = "protoVersion3"
|
||||
peerStore[ConnectionBook][p3] = Connected
|
||||
peerStore[DisconnectBook][p3] = 0
|
||||
peerStore[SourceBook][p3] = Discv5
|
||||
|
||||
# Peer4: Added but never connected
|
||||
peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()]
|
||||
# unknown: peerStore[ProtoBook][p4]
|
||||
peerStore[KeyBook][p4] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
# unknown: peerStore[AgentBook][p4]
|
||||
# unknown: peerStore[ProtoVersionBook][p4]
|
||||
peerStore[ConnectionBook][p4] = NotConnected
|
||||
peerStore[DisconnectBook][p4] = 0
|
||||
peerStore[SourceBook][p4] = Discv5
|
||||
|
||||
# Peer5: Connecteed in the past
|
||||
peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()]
|
||||
peerStore[ProtoBook][p5] = @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||
peerStore[KeyBook][p5] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
|
||||
peerStore[AgentBook][p5] = "gowaku"
|
||||
peerStore[ProtoVersionBook][p5] = "protoVersion5"
|
||||
peerStore[ConnectionBook][p5] = CanConnect
|
||||
peerStore[DisconnectBook][p5] = 1000
|
||||
peerStore[SourceBook][p5] = Discv5
|
||||
|
||||
test "get() returns the correct StoredInfo for a given PeerId":
|
||||
# When
|
||||
let storedInfoPeer1 = peerStore.get(p1)
|
||||
let storedInfoPeer6 = peerStore.get(p6)
|
||||
|
||||
# Then
|
||||
check:
|
||||
# regression on nim-libp2p fields
|
||||
storedInfoPeer1.peerId == p1
|
||||
storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||
storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
storedInfoPeer1.agent == "nwaku"
|
||||
storedInfoPeer1.protoVersion == "protoVersion1"
|
||||
|
||||
# our extended fields
|
||||
storedInfoPeer1.connectedness == Connected
|
||||
storedInfoPeer1.disconnectTime == 0
|
||||
storedInfoPeer1.origin == Discv5
|
||||
|
||||
check:
|
||||
# fields are empty
|
||||
storedInfoPeer6.peerId == p6
|
||||
storedInfoPeer6.addrs.len == 0
|
||||
storedInfoPeer6.protos.len == 0
|
||||
storedInfoPeer6.agent == ""
|
||||
storedInfoPeer6.protoVersion == ""
|
||||
storedInfoPeer6.connectedness == NotConnected
|
||||
storedInfoPeer6.disconnectTime == 0
|
||||
storedInfoPeer6.origin == Unknown
|
||||
|
||||
test "peers() returns all StoredInfo of the PeerStore":
|
||||
# When
|
||||
let allPeers = peerStore.peers()
|
||||
|
||||
# Then
|
||||
check:
|
||||
allPeers.len == 5
|
||||
allPeers.anyIt(it.peerId == p1)
|
||||
allPeers.anyIt(it.peerId == p2)
|
||||
allPeers.anyIt(it.peerId == p3)
|
||||
allPeers.anyIt(it.peerId == p4)
|
||||
allPeers.anyIt(it.peerId == p5)
|
||||
|
||||
let p3 = allPeers.filterIt(it.peerId == p3)[0]
|
||||
|
||||
check:
|
||||
# regression on nim-libp2p fields
|
||||
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
|
||||
p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
p3.agent == "gowaku"
|
||||
p3.protoVersion == "protoVersion3"
|
||||
|
||||
# our extended fields
|
||||
p3.connectedness == Connected
|
||||
p3.disconnectTime == 0
|
||||
p3.origin == Discv5
|
||||
|
||||
test "peers() returns all StoredInfo matching a specific protocol":
|
||||
# When
|
||||
let storePeers = peerStore.peers("/vac/waku/store/2.0.0")
|
||||
let lpPeers = peerStore.peers("/vac/waku/lightpush/2.0.0")
|
||||
|
||||
# Then
|
||||
check:
|
||||
# Only p1 and p2 support that protocol
|
||||
storePeers.len == 2
|
||||
storePeers.anyIt(it.peerId == p1)
|
||||
storePeers.anyIt(it.peerId == p2)
|
||||
|
||||
check:
|
||||
# Only p3 supports that protocol
|
||||
lpPeers.len == 1
|
||||
lpPeers.anyIt(it.peerId == p3)
|
||||
lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
|
||||
test "peers() returns all StoredInfo matching a given protocolMatcher":
|
||||
# When
|
||||
let pMatcherStorePeers = peerStore.peers(protocolMatcher("/vac/waku/store/2.0.0"))
|
||||
let pMatcherSwapPeers = peerStore.peers(protocolMatcher("/vac/waku/swap/2.0.0"))
|
||||
|
||||
# Then
|
||||
check:
|
||||
# peers: 1,2,3,5 match /vac/waku/store/2.0.0/xxx
|
||||
pMatcherStorePeers.len == 4
|
||||
pMatcherStorePeers.anyIt(it.peerId == p1)
|
||||
pMatcherStorePeers.anyIt(it.peerId == p2)
|
||||
pMatcherStorePeers.anyIt(it.peerId == p3)
|
||||
pMatcherStorePeers.anyIt(it.peerId == p5)
|
||||
|
||||
check:
|
||||
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
|
||||
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
|
||||
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||
|
||||
check:
|
||||
pMatcherSwapPeers.len == 1
|
||||
pMatcherSwapPeers.anyIt(it.peerId == p5)
|
||||
pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||
|
||||
test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
|
||||
# Given
|
||||
let storedInfoPeer1 = peerStore.get(p1)
|
||||
|
||||
# When
|
||||
let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo()
|
||||
|
||||
# Then
|
||||
check:
|
||||
remotePeerInfo1.peerId == p1
|
||||
remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
|
||||
remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
|
||||
|
||||
test "connectedness() returns the connection status of a given PeerId":
|
||||
check:
|
||||
# peers tracked in the peerstore
|
||||
peerStore.connectedness(p1) == Connected
|
||||
peerStore.connectedness(p2) == Connected
|
||||
peerStore.connectedness(p3) == Connected
|
||||
peerStore.connectedness(p4) == NotConnected
|
||||
peerStore.connectedness(p5) == CanConnect
|
||||
|
||||
# peer not tracked in the peerstore
|
||||
peerStore.connectedness(p6) == NotConnected
|
||||
|
||||
test "hasPeer() returns true if the peer supports a given protocol":
|
||||
check:
|
||||
peerStore.hasPeer(p1, "/vac/waku/relay/2.0.0-beta1")
|
||||
peerStore.hasPeer(p1, "/vac/waku/store/2.0.0")
|
||||
not peerStore.hasPeer(p1, "it-does-not-contain-this-protocol")
|
||||
|
||||
peerStore.hasPeer(p2, "/vac/waku/relay/2.0.0")
|
||||
peerStore.hasPeer(p2, "/vac/waku/store/2.0.0")
|
||||
|
||||
peerStore.hasPeer(p3, "/vac/waku/lightpush/2.0.0")
|
||||
peerStore.hasPeer(p3, "/vac/waku/store/2.0.0-beta1")
|
||||
|
||||
# we have no knowledge of p4 supported protocols
|
||||
not peerStore.hasPeer(p4, "/vac/waku/lightpush/2.0.0")
|
||||
|
||||
peerStore.hasPeer(p5, "/vac/waku/swap/2.0.0")
|
||||
peerStore.hasPeer(p5, "/vac/waku/store/2.0.0-beta2")
|
||||
not peerStore.hasPeer(p5, "another-protocol-not-contained")
|
||||
|
||||
# peer 6 is not in the PeerStore
|
||||
not peerStore.hasPeer(p6, "/vac/waku/lightpush/2.0.0")
|
||||
|
||||
test "hasPeers() returns true if any peer in the PeerStore supports a given protocol":
|
||||
# Match specific protocols
|
||||
check:
|
||||
peerStore.hasPeers("/vac/waku/relay/2.0.0-beta1")
|
||||
peerStore.hasPeers("/vac/waku/store/2.0.0")
|
||||
peerStore.hasPeers("/vac/waku/lightpush/2.0.0")
|
||||
not peerStore.hasPeers("/vac/waku/does-not-exist/2.0.0")
|
||||
|
||||
# Match protocolMatcher protocols
|
||||
check:
|
||||
peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0"))
|
||||
not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0"))
|
||||
|
||||
test "selectPeer() returns if a peer supports a given protocol":
|
||||
# When
|
||||
let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0")
|
||||
|
||||
# Then
|
||||
check:
|
||||
swapPeer.isSome()
|
||||
swapPeer.get().peerId == p5
|
||||
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
|
||||
@ -88,11 +88,11 @@ procSuite "Waku DNS Discovery":
|
||||
|
||||
check:
|
||||
# We have successfully connected to all discovered nodes
|
||||
node4.peerManager.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node1.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node2.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node3.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peerStore.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId)
|
||||
node4.peerManager.peerStore.connectedness(node1.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peerStore.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId)
|
||||
node4.peerManager.peerStore.connectedness(node2.switch.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peerStore.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId)
|
||||
node4.peerManager.peerStore.connectedness(node3.switch.peerInfo.peerId) == Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()])
|
||||
|
||||
@ -128,7 +128,7 @@ proc main(): Future[int] {.async.} =
|
||||
error "Timedout after", timeout=conf.timeout
|
||||
|
||||
let lp2pPeerStore = node.switch.peerStore
|
||||
let conStatus = node.peerManager.peerStore.connectionBook[peer.peerId]
|
||||
let conStatus = node.peerManager.peerStore[ConnectionBook][peer.peerId]
|
||||
|
||||
if conStatus in [Connected, CanConnect]:
|
||||
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId]
|
||||
|
||||
@ -64,34 +64,38 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
|
||||
if not node.wakuRelay.isNil:
|
||||
# Map managed peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuRelayCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuRelayCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuRelayCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuFilter.isNil:
|
||||
# Map WakuFilter peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuFilterCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuFilterCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuFilterCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuSwap.isNil:
|
||||
# Map WakuSwap peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuSwapCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuSwapCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuSwapCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
if not node.wakuStore.isNil:
|
||||
# Map WakuStore peers to WakuPeers and add to return list
|
||||
wPeers.insert(node.peerManager.peers(WakuStoreCodec)
|
||||
wPeers.insert(node.peerManager.peerStore
|
||||
.peers(WakuStoreCodec)
|
||||
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
|
||||
protocol: WakuStoreCodec,
|
||||
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
|
||||
connected: it.connectedness == Connectedness.Connected)),
|
||||
wPeers.len) # Append to the end of the sequence
|
||||
|
||||
# @TODO filter output on protocol/connected-status
|
||||
|
||||
@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
|
||||
## Returns history for a list of content topics with optional paging
|
||||
debug "get_waku_v2_store_v1_messages"
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
raise newException(ValueError, "no suitable remote store peers")
|
||||
|
||||
|
||||
@ -29,7 +29,7 @@ logScope:
|
||||
type
|
||||
PeerManager* = ref object of RootObj
|
||||
switch*: Switch
|
||||
peerStore*: WakuPeerStore
|
||||
peerStore*: PeerStore
|
||||
storage: PeerStorage
|
||||
|
||||
let
|
||||
@ -39,11 +39,6 @@ let
|
||||
# Helper functions #
|
||||
####################
|
||||
|
||||
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
|
||||
RemotePeerInfo.init(peerId = storedInfo.peerId,
|
||||
addrs = toSeq(storedInfo.addrs),
|
||||
protocols = toSeq(storedInfo.protos))
|
||||
|
||||
proc insertOrReplace(ps: PeerStorage,
|
||||
peerId: PeerID,
|
||||
storedInfo: StoredInfo,
|
||||
@ -73,7 +68,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
||||
debug "Dialing remote peer timed out"
|
||||
waku_peers_dials.inc(labelValues = ["timeout"])
|
||||
|
||||
pm.peerStore.connectionBook[peerId] = CannotConnect
|
||||
pm.peerStore[ConnectionBook][peerId] = CannotConnect
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
||||
|
||||
@ -83,7 +78,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
||||
debug "Dialing remote peer failed", msg = e.msg
|
||||
waku_peers_dials.inc(labelValues = ["failed"])
|
||||
|
||||
pm.peerStore.connectionBook[peerId] = CannotConnect
|
||||
pm.peerStore[ConnectionBook][peerId] = CannotConnect
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
|
||||
|
||||
@ -99,11 +94,17 @@ proc loadFromStorage(pm: PeerManager) =
|
||||
# Do not manage self
|
||||
return
|
||||
|
||||
pm.peerStore.addressBook[peerId] = storedInfo.addrs
|
||||
pm.peerStore.protoBook[peerId] = storedInfo.protos
|
||||
pm.peerStore.keyBook[peerId] = storedInfo.publicKey
|
||||
pm.peerStore.connectionBook[peerId] = NotConnected # Reset connectedness state
|
||||
pm.peerStore.disconnectBook[peerId] = disconnectTime
|
||||
# nim-libp2p books
|
||||
pm.peerStore[AddressBook][peerId] = storedInfo.addrs
|
||||
pm.peerStore[ProtoBook][peerId] = storedInfo.protos
|
||||
pm.peerStore[KeyBook][peerId] = storedInfo.publicKey
|
||||
pm.peerStore[AgentBook][peerId] = storedInfo.agent
|
||||
pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion
|
||||
|
||||
# custom books
|
||||
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
|
||||
pm.peerStore[DisconnectBook][peerId] = disconnectTime
|
||||
pm.peerStore[SourceBook][peerId] = storedInfo.origin
|
||||
|
||||
let res = pm.storage.getAll(onData)
|
||||
if res.isErr:
|
||||
@ -117,26 +118,26 @@ proc loadFromStorage(pm: PeerManager) =
|
||||
##################
|
||||
|
||||
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
if not pm.peerStore.addressBook.contains(peerId):
|
||||
if not pm.peerStore[AddressBook].contains(peerId):
|
||||
## We only consider connection events if we
|
||||
## already track some addresses for this peer
|
||||
return
|
||||
|
||||
case event.kind
|
||||
of ConnEventKind.Connected:
|
||||
pm.peerStore.connectionBook[peerId] = Connected
|
||||
pm.peerStore[ConnectionBook][peerId] = Connected
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
|
||||
return
|
||||
of ConnEventKind.Disconnected:
|
||||
pm.peerStore.connectionBook[peerId] = CanConnect
|
||||
pm.peerStore[ConnectionBook][peerId] = CanConnect
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
||||
return
|
||||
|
||||
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
|
||||
let pm = PeerManager(switch: switch,
|
||||
peerStore: WakuPeerStore.new(),
|
||||
peerStore: switch.peerStore,
|
||||
storage: storage)
|
||||
|
||||
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||
@ -157,44 +158,6 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
|
||||
# Manager interface #
|
||||
#####################
|
||||
|
||||
proc peers*(pm: PeerManager): seq[StoredInfo] =
|
||||
# Return the known info for all peers
|
||||
pm.peerStore.peers()
|
||||
|
||||
proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
|
||||
# Return the known info for all peers registered on the specified protocol
|
||||
pm.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
|
||||
# Return the known info for all peers matching the provided protocolMatcher
|
||||
pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness =
|
||||
# Return the connection state of the given, managed peer
|
||||
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
||||
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
||||
|
||||
let storedInfo = pm.peerStore.get(peerId)
|
||||
|
||||
if (storedInfo == StoredInfo()):
|
||||
# Peer is not managed, therefore not connected
|
||||
return NotConnected
|
||||
else:
|
||||
pm.peerStore.connectionBook[peerId]
|
||||
|
||||
proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool =
|
||||
# Returns `true` if peer is included in manager for the specified protocol
|
||||
|
||||
pm.peerStore.get(peerId).protos.contains(proto)
|
||||
|
||||
proc hasPeers*(pm: PeerManager, proto: string): bool =
|
||||
# Returns `true` if manager has any peers for the specified protocol
|
||||
pm.peers.anyIt(it.protos.contains(proto))
|
||||
|
||||
proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool =
|
||||
# Returns `true` if manager has any peers matching the protocolMatcher
|
||||
pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
||||
# Adds peer to manager for the specified protocol
|
||||
|
||||
@ -206,33 +169,21 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
|
||||
|
||||
# ...known addresses
|
||||
for multiaddr in remotePeerInfo.addrs:
|
||||
pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr)
|
||||
pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr
|
||||
|
||||
# ...public key
|
||||
var publicKey: PublicKey
|
||||
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
|
||||
|
||||
pm.peerStore.keyBook[remotePeerInfo.peerId] = publicKey
|
||||
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
|
||||
|
||||
# ...associated protocols
|
||||
pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto)
|
||||
# nim-libp2p identify overrides this
|
||||
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto
|
||||
|
||||
# Add peer to storage. Entry will subsequently be updated with connectedness information
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
|
||||
|
||||
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
|
||||
# Selects the best peer for a given protocol
|
||||
let peers = pm.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
if peers.len >= 1:
|
||||
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
||||
let peerStored = peers[0]
|
||||
|
||||
return some(peerStored.toRemotePeerInfo())
|
||||
else:
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
proc reconnectPeers*(pm: PeerManager,
|
||||
proto: string,
|
||||
protocolMatcher: Matcher,
|
||||
@ -242,31 +193,27 @@ proc reconnectPeers*(pm: PeerManager,
|
||||
|
||||
debug "Reconnecting peers", proto=proto
|
||||
|
||||
for storedInfo in pm.peers(protocolMatcher):
|
||||
# Check if peer is reachable.
|
||||
if pm.peerStore.connectionBook[storedInfo.peerId] == CannotConnect:
|
||||
debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId
|
||||
for storedInfo in pm.peerStore.peers(protocolMatcher):
|
||||
# Check that the peer can be connected
|
||||
if storedInfo.connectedness == CannotConnect:
|
||||
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
|
||||
continue
|
||||
|
||||
# Respect optional backoff period where applicable.
|
||||
let
|
||||
disconnectTime = Moment.init(pm.peerStore.disconnectBook[storedInfo.peerId], Second) # Convert
|
||||
# TODO: Add method to peerStore (eg isBackoffExpired())
|
||||
disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert
|
||||
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
|
||||
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
|
||||
|
||||
trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime
|
||||
|
||||
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
|
||||
if backoffTime > ZeroDuration:
|
||||
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
|
||||
# We disconnected recently and still need to wait for a backoff period before connecting
|
||||
await sleepAsync(backoffTime)
|
||||
|
||||
# Add to protos for peer, if it has not been added yet
|
||||
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
|
||||
let remotePeerInfo = storedInfo.toRemotePeerInfo()
|
||||
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
||||
pm.addPeer(remotePeerInfo, proto)
|
||||
|
||||
trace "Reconnecting to peer", peerId=storedInfo.peerId
|
||||
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
||||
|
||||
@ -279,7 +226,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d
|
||||
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
|
||||
|
||||
# First add dialed peer info to peer store, if it does not exist yet...
|
||||
if not pm.hasPeer(remotePeerInfo.peerId, proto):
|
||||
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
|
||||
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
||||
pm.addPeer(remotePeerInfo, proto)
|
||||
|
||||
|
||||
@ -4,12 +4,17 @@ else:
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[tables, sequtils, sets],
|
||||
std/[tables, sequtils, sets, options],
|
||||
libp2p/builders,
|
||||
libp2p/peerstore
|
||||
|
||||
import
|
||||
../../utils/peers
|
||||
|
||||
export peerstore, builders
|
||||
|
||||
# TODO rename to peer_store_extended to emphasize its a nimlibp2 extension
|
||||
|
||||
type
|
||||
Connectedness* = enum
|
||||
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
|
||||
@ -21,73 +26,106 @@ type
|
||||
# Connected: actively connected to peer.
|
||||
Connected
|
||||
|
||||
PeerOrigin* = enum
|
||||
Unknown,
|
||||
Discv5,
|
||||
Static,
|
||||
Dns
|
||||
|
||||
# Keeps track of the Connectedness state of a peer
|
||||
ConnectionBook* = ref object of PeerBook[Connectedness]
|
||||
|
||||
DisconnectBook* = ref object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps
|
||||
# Keeps track of when peers were disconnected in Unix timestamps
|
||||
DisconnectBook* = ref object of PeerBook[int64]
|
||||
|
||||
WakuPeerStore* = ref object
|
||||
addressBook*: AddressBook
|
||||
protoBook*: ProtoBook
|
||||
keyBook*: KeyBook
|
||||
connectionBook*: ConnectionBook
|
||||
disconnectBook*: DisconnectBook
|
||||
# Keeps track of the origin of a peer
|
||||
SourceBook* = ref object of PeerBook[PeerOrigin]
|
||||
|
||||
StoredInfo* = object
|
||||
# Collates stored info about a peer
|
||||
peerId*: PeerID
|
||||
# Taken from nim-libp2
|
||||
peerId*: PeerId
|
||||
addrs*: seq[MultiAddress]
|
||||
protos*: seq[string]
|
||||
publicKey*: PublicKey
|
||||
agent*: string
|
||||
protoVersion*: string
|
||||
|
||||
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
||||
let
|
||||
addressBook = AddressBook(book: initTable[PeerID, seq[MultiAddress]]())
|
||||
protoBook = ProtoBook(book: initTable[PeerID, seq[string]]())
|
||||
keyBook = KeyBook(book: initTable[PeerID, PublicKey]())
|
||||
connectionBook = ConnectionBook(book: initTable[PeerID, Connectedness]())
|
||||
disconnectBook = DisconnectBook(book: initTable[PeerID, int64]())
|
||||
|
||||
T(addressBook: addressBook,
|
||||
protoBook: protoBook,
|
||||
keyBook: keyBook,
|
||||
connectionBook: connectionBook,
|
||||
disconnectBook: disconnectBook)
|
||||
|
||||
#####################
|
||||
# Utility functions #
|
||||
#####################
|
||||
|
||||
proc add*[T](peerBook: SeqPeerBook[T],
|
||||
peerId: PeerId,
|
||||
entry: T) =
|
||||
## Add entry to a given peer. If the peer is not known,
|
||||
## it will be set with the provided entry.
|
||||
|
||||
peerBook.book.mgetOrPut(peerId,
|
||||
newSeq[T]()).add(entry)
|
||||
|
||||
# TODO: Notify clients?
|
||||
# Extended custom fields
|
||||
connectedness*: Connectedness
|
||||
disconnectTime*: int64
|
||||
origin*: PeerOrigin
|
||||
|
||||
##################
|
||||
# Peer Store API #
|
||||
##################
|
||||
|
||||
proc get*(peerStore: WakuPeerStore,
|
||||
proc get*(peerStore: PeerStore,
|
||||
peerId: PeerID): StoredInfo =
|
||||
## Get the stored information of a given peer.
|
||||
|
||||
StoredInfo(
|
||||
# Taken from nim-libp2
|
||||
peerId: peerId,
|
||||
addrs: peerStore.addressBook[peerId],
|
||||
protos: peerStore.protoBook[peerId],
|
||||
publicKey: peerStore.keyBook[peerId]
|
||||
addrs: peerStore[AddressBook][peerId],
|
||||
protos: peerStore[ProtoBook][peerId],
|
||||
publicKey: peerStore[KeyBook][peerId],
|
||||
agent: peerStore[AgentBook][peerId],
|
||||
protoVersion: peerStore[ProtoVersionBook][peerId],
|
||||
|
||||
# Extended custom fields
|
||||
connectedness: peerStore[ConnectionBook][peerId],
|
||||
disconnectTime: peerStore[DisconnectBook][peerId],
|
||||
origin: peerStore[SourceBook][peerId],
|
||||
)
|
||||
|
||||
proc peers*(peerStore: WakuPeerStore): seq[StoredInfo] =
|
||||
proc peers*(peerStore: PeerStore): seq[StoredInfo] =
|
||||
## Get all the stored information of every peer.
|
||||
|
||||
let allKeys = concat(toSeq(keys(peerStore.addressBook.book)),
|
||||
toSeq(keys(peerStore.protoBook.book)),
|
||||
toSeq(keys(peerStore.keyBook.book))).toHashSet()
|
||||
let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()),
|
||||
toSeq(peerStore[ProtoBook].book.keys()),
|
||||
toSeq(peerStore[KeyBook].book.keys())).toHashSet()
|
||||
|
||||
return allKeys.mapIt(peerStore.get(it))
|
||||
|
||||
proc peers*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
|
||||
# Return the known info for all peers registered on the specified protocol
|
||||
peerStore.peers.filterIt(it.protos.contains(proto))
|
||||
|
||||
proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[StoredInfo] =
|
||||
# Return the known info for all peers matching the provided protocolMatcher
|
||||
peerStore.peers.filterIt(it.protos.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
|
||||
RemotePeerInfo.init(peerId = storedInfo.peerId,
|
||||
addrs = toSeq(storedInfo.addrs),
|
||||
protocols = toSeq(storedInfo.protos))
|
||||
|
||||
|
||||
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
|
||||
# Return the connection state of the given, managed peer
|
||||
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
|
||||
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
|
||||
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
|
||||
|
||||
proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
|
||||
# Returns `true` if peer is included in manager for the specified protocol
|
||||
# TODO: What if peer does not exist in the peerStore?
|
||||
peerStore.get(peerId).protos.contains(proto)
|
||||
|
||||
proc hasPeers*(peerStore: PeerStore, proto: string): bool =
|
||||
# Returns `true` if the peerstore has any peer for the specified protocol
|
||||
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto))
|
||||
|
||||
proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
|
||||
# Returns `true` if the peerstore has any peer matching the protocolMatcher
|
||||
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))
|
||||
|
||||
proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
|
||||
# Selects the best peer for a given protocol
|
||||
let peers = peerStore.peers().filterIt(it.protos.contains(proto))
|
||||
|
||||
if peers.len >= 1:
|
||||
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
|
||||
let peerStored = peers[0]
|
||||
|
||||
return some(peerStored.toRemotePeerInfo())
|
||||
else:
|
||||
return none(RemotePeerInfo)
|
||||
|
||||
@ -102,7 +102,7 @@ type
|
||||
started*: bool # Indicates that node has started listening
|
||||
|
||||
|
||||
proc protocolMatcher(codec: string): Matcher =
|
||||
proc protocolMatcher*(codec: string): Matcher =
|
||||
## Returns a protocol matcher function for the provided codec
|
||||
proc match(proto: string): bool {.gcsafe.} =
|
||||
## Matches a proto with any postfix to the provided codec.
|
||||
@ -147,6 +147,7 @@ proc new*(T: type WakuNode,
|
||||
dns4DomainName = none(string),
|
||||
discv5UdpPort = none(Port),
|
||||
agentString = none(string), # defaults to nim-libp2p version
|
||||
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
|
||||
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
||||
## Creates a Waku Node instance.
|
||||
|
||||
@ -217,7 +218,8 @@ proc new*(T: type WakuNode,
|
||||
secureCertPath = secureCert,
|
||||
nameResolver = nameResolver,
|
||||
sendSignedPeerRecord = sendSignedPeerRecord,
|
||||
agentString = agentString
|
||||
agentString = agentString,
|
||||
peerStoreCapacity = peerStoreCapacity,
|
||||
)
|
||||
|
||||
let wakuNode = WakuNode(
|
||||
@ -357,7 +359,7 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||
node.subscribe(topic, none(TopicHandler))
|
||||
|
||||
# Resume previous relay connections
|
||||
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
|
||||
info "Found previous WakuRelay peers. Reconnecting."
|
||||
|
||||
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
|
||||
@ -502,7 +504,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
|
||||
error "cannot register filter subscription to topic", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
|
||||
if peerOpt.isNone():
|
||||
error "cannot register filter subscription to topic", error="no suitable remote peers"
|
||||
return
|
||||
@ -517,7 +519,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
|
||||
error "cannot unregister filter subscription to content", error="waku filter client is nil"
|
||||
return
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
|
||||
if peerOpt.isNone():
|
||||
error "cannot register filter subscription to topic", error="no suitable remote peers"
|
||||
return
|
||||
@ -675,7 +677,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
|
||||
if node.wakuStoreClient.isNil():
|
||||
return err("waku store client is nil")
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
error "no suitable remote peers"
|
||||
return err("peer_not_found_failure")
|
||||
@ -764,7 +766,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
|
||||
error "failed to publish message", error="waku lightpush client is nil"
|
||||
return
|
||||
|
||||
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
|
||||
let peerOpt = node.peerManager.peerStore.selectPeer(WakuLightPushCodec)
|
||||
if peerOpt.isNone():
|
||||
error "failed to publish message", error="no suitable remote peers"
|
||||
return
|
||||
@ -824,14 +826,15 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
|
||||
|
||||
node.switch.mount(node.libp2pPing)
|
||||
|
||||
# TODO: Move this logic to PeerManager
|
||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||
while node.started:
|
||||
# Keep all connected peers alive while running
|
||||
trace "Running keepalive"
|
||||
|
||||
# First get a list of connected peer infos
|
||||
let peers = node.peerManager.peers()
|
||||
.filterIt(node.peerManager.connectedness(it.peerId) == Connected)
|
||||
let peers = node.peerManager.peerStore.peers()
|
||||
.filterIt(it.connectedness == Connected)
|
||||
.mapIt(it.toRemotePeerInfo())
|
||||
|
||||
# Attempt to retrieve and ping the active outgoing connection for each peer
|
||||
@ -855,6 +858,9 @@ proc startKeepalive*(node: WakuNode) =
|
||||
|
||||
asyncSpawn node.keepaliveLoop(defaultKeepalive)
|
||||
|
||||
# TODO: Decouple discovery logic from connection logic
|
||||
# A discovered peer goes to the PeerStore
|
||||
# The PeerManager uses to PeerStore to dial peers
|
||||
proc runDiscv5Loop(node: WakuNode) {.async.} =
|
||||
## Continuously add newly discovered nodes
|
||||
## using Node Discovery v5
|
||||
|
||||
@ -71,7 +71,8 @@ proc newWakuSwitch*(
|
||||
wssEnabled: bool = false,
|
||||
secureKeyPath: string = "",
|
||||
secureCertPath: string = "",
|
||||
agentString = none(string), # defaults to nim-libp2p version
|
||||
agentString = none(string), # defaults to nim-libp2p version,
|
||||
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
|
||||
): Switch
|
||||
{.raises: [Defect, IOError, LPError].} =
|
||||
|
||||
@ -88,6 +89,8 @@ proc newWakuSwitch*(
|
||||
.withNameResolver(nameResolver)
|
||||
.withSignedPeerRecord(sendSignedPeerRecord)
|
||||
|
||||
if peerStoreCapacity.isSome():
|
||||
b = b.withPeerStore(peerStoreCapacity.get())
|
||||
if agentString.isSome():
|
||||
b = b.withAgentVersion(agentString.get())
|
||||
if privKey.isSome():
|
||||
|
||||
@ -77,7 +77,7 @@ proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Fut
|
||||
return ok()
|
||||
|
||||
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
||||
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
|
||||
if peerOpt.isNone():
|
||||
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
||||
return err(peerNotFoundFailure)
|
||||
@ -106,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo
|
||||
return ok()
|
||||
|
||||
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
|
||||
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
|
||||
if peerOpt.isNone():
|
||||
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
|
||||
return err(peerNotFoundFailure)
|
||||
|
||||
@ -210,7 +210,7 @@ when defined(waku_exp_store_resume):
|
||||
else:
|
||||
debug "no candidate list is provided, selecting a random peer"
|
||||
# if no peerList is set then query from one of the peers stored in the peer manager
|
||||
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||
let peerOpt = w.peerManager.peerStore.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
warn "no suitable remote peers"
|
||||
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user