refactor: reuse nim-libp2p peerstore + move peermanager logic (#1383)

* refactor: reuse nim-libp2p peerstore + move peermanager logic

* refactor: fix comments

* refactor: modify reconnectPeers and unittest

* feat(apps): new flag for peerStoreCapacity

* fix(examples): fix example2 target

* refactor: fix comments
This commit is contained in:
Alvaro Revuelta 2022-11-24 14:11:23 +01:00 committed by GitHub
parent 089e2ae1e8
commit 43fd11b4dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 571 additions and 264 deletions

View File

@ -78,6 +78,11 @@ type
defaultValue: 50
name: "max-connections" }: uint16
peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore."
defaultValue: 100
name: "peer-store-capacity" }: int
peerPersistence* {.
desc: "Enable peer persistence.",
defaultValue: false,

View File

@ -273,7 +273,8 @@ proc initNode(conf: WakuNodeConf,
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
dns4DomainName,
discv5UdpPort,
some(conf.agentString)
some(conf.agentString),
some(conf.peerStoreCapacity),
)
except:
return err("failed to create waku node instance: " & getCurrentExceptionMsg())

View File

@ -37,7 +37,7 @@ proc setupAndPublish() {.async.} =
ip = ValidIpAddress.init("0.0.0.0")
node = WakuNode.new(nodeKey, ip, Port(wakuPort))
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)
# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
@ -59,7 +59,7 @@ proc setupAndPublish() {.async.} =
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
if numConnectedPeers >= 6:
notice "publisher is ready", connectedPeers=numConnectedPeers, required=6
break
@ -85,4 +85,4 @@ proc setupAndPublish() {.async.} =
await sleepAsync(5000)
asyncSpawn setupAndPublish()
runForever()
runForever()

View File

@ -55,7 +55,7 @@ proc setupAndSubscribe() {.async.} =
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)
if numConnectedPeers >= 6:
notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6
break
@ -81,4 +81,4 @@ proc setupAndSubscribe() {.async.} =
asyncSpawn setupAndSubscribe()
runForever()
runForever()

View File

@ -5,6 +5,7 @@ import
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
chronos,
json_rpc/rpcserver,
json_rpc/rpcclient,
eth/keys,
@ -24,7 +25,8 @@ import
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_swap/waku_swap,
../test_helpers
../test_helpers,
./testlib/testutils
procSuite "Peer Manager":
asyncTest "Peer dialing works":
@ -34,7 +36,7 @@ procSuite "Peer Manager":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802))
peerInfo2 = node2.switch.peerInfo
await allFutures([node1.start(), node2.start()])
await node1.mountRelay()
@ -47,17 +49,17 @@ procSuite "Peer Manager":
check:
conn.activity
conn.peerId == peerInfo2.peerId
# Check that node2 is being managed in node1
check:
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
# Check connectedness
check:
node1.peerManager.connectedness(peerInfo2.peerId) == Connectedness.Connected
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connectedness.Connected
await allFutures([node1.stop(), node2.stop()])
asyncTest "Dialing fails gracefully":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
@ -65,7 +67,7 @@ procSuite "Peer Manager":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812))
peerInfo2 = node2.switch.peerInfo
await node1.start()
# Purposefully don't start node2
@ -78,7 +80,7 @@ procSuite "Peer Manager":
# Check connection failed gracefully
check:
connOpt.isNone()
await node1.stop()
asyncTest "Adding, selecting and filtering peers work":
@ -97,7 +99,7 @@ procSuite "Peer Manager":
storeLoc = MultiAddress.init("/ip4/127.0.0.3/tcp/4").tryGet()
storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
storePeer = PeerInfo.new(storeKey, @[storeLoc])
await node.start()
await node.mountFilterClient()
@ -105,25 +107,25 @@ procSuite "Peer Manager":
node.mountStoreClient()
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo())
node.setFilterPeer(filterPeer.toRemotePeerInfo())
# Check peers were successfully added to peer manager
check:
node.peerManager.peers().len == 3
node.peerManager.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protos.contains(WakuFilterCodec))
node.peerManager.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
it.addrs.contains(swapLoc) and
it.protos.contains(WakuSwapCodec))
node.peerManager.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protos.contains(WakuStoreCodec))
node.peerManager.peerStore.peers().len == 3
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protos.contains(WakuFilterCodec))
node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
it.addrs.contains(swapLoc) and
it.protos.contains(WakuSwapCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protos.contains(WakuStoreCodec))
await node.stop()
asyncTest "Peer manager keeps track of connections":
let
@ -132,7 +134,7 @@ procSuite "Peer Manager":
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832))
peerInfo2 = node2.switch.peerInfo
await node1.start()
await node1.mountRelay()
@ -142,28 +144,28 @@ procSuite "Peer Manager":
node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)
check:
# No information about node2's connectedness
node1.peerManager.connectedness(peerInfo2.peerId) == NotConnected
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
# Purposefully don't start node2
# Attempt dialing node2 from node1
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Cannot connect to node2
node1.peerManager.connectedness(peerInfo2.peerId) == CannotConnect
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CannotConnect
# Successful connection
await node2.start()
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Stop node. Gracefully disconnect from all peers.
await node1.stop()
check:
# Not currently connected to node2, but had recent, successful connection.
node1.peerManager.connectedness(peerInfo2.peerId) == CanConnect
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CanConnect
await node2.stop()
asyncTest "Peer manager can use persistent storage and survive restarts":
@ -173,9 +175,9 @@ procSuite "Peer Manager":
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842))
peerInfo2 = node2.switch.peerInfo
await node1.start()
await node2.start()
@ -185,33 +187,34 @@ procSuite "Peer Manager":
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.peers().len == 1
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage)
await node3.start()
check:
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.mountRelay() # This should trigger a reconnect
check:
# Reconnected to node2 after "restart"
node3.peerManager.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
# TODO: nwaku/issues/1377
xasyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
@ -222,7 +225,7 @@ procSuite "Peer Manager":
peerInfo2 = node2.switch.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0"
await node1.start()
await node2.start()
@ -234,16 +237,16 @@ procSuite "Peer Manager":
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), node2.wakuRelay.codec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.peers().len == 1
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage)
await node3.mountRelay()
node3.wakuRelay.codec = stableCodec
check:
@ -251,19 +254,63 @@ procSuite "Peer Manager":
node2.wakuRelay.codec == betaCodec
node3.wakuRelay.codec == stableCodec
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
await node3.start() # This should trigger a reconnect
check:
# Reconnected to node2 after "restart"
node3.peerManager.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peers().anyIt(it.protos.contains(stableCodec))
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
var nodes: seq[WakuNode]
for i in 0..<4:
let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i))
nodes &= node
# Start them
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Get all peer infos
let peerInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
# Add all peers (but self) to node 0
nodes[0].peerManager.addPeer(peerInfos[1], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[2], WakuRelayCodec)
nodes[0].peerManager.addPeer(peerInfos[3], WakuRelayCodec)
# Attempt to connect to all known peers supporting a given protocol
await nodes[0].peerManager.reconnectPeers(WakuRelayCodec, protocolMatcher(WakuRelayCodec))
check:
# Peerstore track all three peers
nodes[0].peerManager.peerStore.peers().len == 3
# All peer ids are correct
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[2].switch.peerInfo.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[3].switch.peerInfo.peerId)
# All peers support the relay protocol
nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains(WakuRelayCodec)
nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains(WakuRelayCodec)
# All peers are connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == Connected
nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == Connected
await allFutures(nodes.mapIt(it.stop()))

View File

@ -0,0 +1,256 @@
{.used.}
import
std/[options,sequtils],
libp2p/crypto/crypto,
libp2p/peerstore,
libp2p/multiaddress,
testutils/unittests
import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/peer_manager/waku_peer_store,
../../waku/v2/node/waku_node,
../test_helpers,
./testlib/testutils
suite "Extended nim-libp2p Peer Store":
# Valid peerId missing the last digit. Useful for creating new peerIds
# basePeerId & "1"
# basePeerId & "2"
let basePeerId = "QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW"
setup:
# Setup a nim-libp2p peerstore with some peers
let peerStore = PeerStore.new(capacity = 50)
var p1, p2, p3, p4, p5, p6: PeerId
# create five peers basePeerId + [1-5]
require p1.init(basePeerId & "1")
require p2.init(basePeerId & "2")
require p3.init(basePeerId & "3")
require p4.init(basePeerId & "4")
require p5.init(basePeerId & "5")
# peer6 is not part of the peerstore
require p6.init(basePeerId & "6")
# Peer1: Connected
peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
peerStore[ProtoBook][p1] = @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
peerStore[KeyBook][p1] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p1] = "nwaku"
peerStore[ProtoVersionBook][p1] = "protoVersion1"
peerStore[ConnectionBook][p1] = Connected
peerStore[DisconnectBook][p1] = 0
peerStore[SourceBook][p1] = Discv5
# Peer2: Connected
peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()]
peerStore[ProtoBook][p2] = @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
peerStore[KeyBook][p2] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p2] = "nwaku"
peerStore[ProtoVersionBook][p2] = "protoVersion2"
peerStore[ConnectionBook][p2] = Connected
peerStore[DisconnectBook][p2] = 0
peerStore[SourceBook][p2] = Discv5
# Peer3: Connected
peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
peerStore[ProtoBook][p3] = @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
peerStore[KeyBook][p3] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p3] = "gowaku"
peerStore[ProtoVersionBook][p3] = "protoVersion3"
peerStore[ConnectionBook][p3] = Connected
peerStore[DisconnectBook][p3] = 0
peerStore[SourceBook][p3] = Discv5
# Peer4: Added but never connected
peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()]
# unknown: peerStore[ProtoBook][p4]
peerStore[KeyBook][p4] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
# unknown: peerStore[AgentBook][p4]
# unknown: peerStore[ProtoVersionBook][p4]
peerStore[ConnectionBook][p4] = NotConnected
peerStore[DisconnectBook][p4] = 0
peerStore[SourceBook][p4] = Discv5
# Peer5: Connecteed in the past
peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()]
peerStore[ProtoBook][p5] = @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
peerStore[KeyBook][p5] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p5] = "gowaku"
peerStore[ProtoVersionBook][p5] = "protoVersion5"
peerStore[ConnectionBook][p5] = CanConnect
peerStore[DisconnectBook][p5] = 1000
peerStore[SourceBook][p5] = Discv5
test "get() returns the correct StoredInfo for a given PeerId":
# When
let storedInfoPeer1 = peerStore.get(p1)
let storedInfoPeer6 = peerStore.get(p6)
# Then
check:
# regression on nim-libp2p fields
storedInfoPeer1.peerId == p1
storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
storedInfoPeer1.agent == "nwaku"
storedInfoPeer1.protoVersion == "protoVersion1"
# our extended fields
storedInfoPeer1.connectedness == Connected
storedInfoPeer1.disconnectTime == 0
storedInfoPeer1.origin == Discv5
check:
# fields are empty
storedInfoPeer6.peerId == p6
storedInfoPeer6.addrs.len == 0
storedInfoPeer6.protos.len == 0
storedInfoPeer6.agent == ""
storedInfoPeer6.protoVersion == ""
storedInfoPeer6.connectedness == NotConnected
storedInfoPeer6.disconnectTime == 0
storedInfoPeer6.origin == Unknown
test "peers() returns all StoredInfo of the PeerStore":
# When
let allPeers = peerStore.peers()
# Then
check:
allPeers.len == 5
allPeers.anyIt(it.peerId == p1)
allPeers.anyIt(it.peerId == p2)
allPeers.anyIt(it.peerId == p3)
allPeers.anyIt(it.peerId == p4)
allPeers.anyIt(it.peerId == p5)
let p3 = allPeers.filterIt(it.peerId == p3)[0]
check:
# regression on nim-libp2p fields
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
p3.agent == "gowaku"
p3.protoVersion == "protoVersion3"
# our extended fields
p3.connectedness == Connected
p3.disconnectTime == 0
p3.origin == Discv5
test "peers() returns all StoredInfo matching a specific protocol":
# When
let storePeers = peerStore.peers("/vac/waku/store/2.0.0")
let lpPeers = peerStore.peers("/vac/waku/lightpush/2.0.0")
# Then
check:
# Only p1 and p2 support that protocol
storePeers.len == 2
storePeers.anyIt(it.peerId == p1)
storePeers.anyIt(it.peerId == p2)
check:
# Only p3 supports that protocol
lpPeers.len == 1
lpPeers.anyIt(it.peerId == p3)
lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
test "peers() returns all StoredInfo matching a given protocolMatcher":
# When
let pMatcherStorePeers = peerStore.peers(protocolMatcher("/vac/waku/store/2.0.0"))
let pMatcherSwapPeers = peerStore.peers(protocolMatcher("/vac/waku/swap/2.0.0"))
# Then
check:
# peers: 1,2,3,5 match /vac/waku/store/2.0.0/xxx
pMatcherStorePeers.len == 4
pMatcherStorePeers.anyIt(it.peerId == p1)
pMatcherStorePeers.anyIt(it.peerId == p2)
pMatcherStorePeers.anyIt(it.peerId == p3)
pMatcherStorePeers.anyIt(it.peerId == p5)
check:
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
check:
pMatcherSwapPeers.len == 1
pMatcherSwapPeers.anyIt(it.peerId == p5)
pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
# Given
let storedInfoPeer1 = peerStore.get(p1)
# When
let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo()
# Then
check:
remotePeerInfo1.peerId == p1
remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
test "connectedness() returns the connection status of a given PeerId":
check:
# peers tracked in the peerstore
peerStore.connectedness(p1) == Connected
peerStore.connectedness(p2) == Connected
peerStore.connectedness(p3) == Connected
peerStore.connectedness(p4) == NotConnected
peerStore.connectedness(p5) == CanConnect
# peer not tracked in the peerstore
peerStore.connectedness(p6) == NotConnected
test "hasPeer() returns true if the peer supports a given protocol":
check:
peerStore.hasPeer(p1, "/vac/waku/relay/2.0.0-beta1")
peerStore.hasPeer(p1, "/vac/waku/store/2.0.0")
not peerStore.hasPeer(p1, "it-does-not-contain-this-protocol")
peerStore.hasPeer(p2, "/vac/waku/relay/2.0.0")
peerStore.hasPeer(p2, "/vac/waku/store/2.0.0")
peerStore.hasPeer(p3, "/vac/waku/lightpush/2.0.0")
peerStore.hasPeer(p3, "/vac/waku/store/2.0.0-beta1")
# we have no knowledge of p4 supported protocols
not peerStore.hasPeer(p4, "/vac/waku/lightpush/2.0.0")
peerStore.hasPeer(p5, "/vac/waku/swap/2.0.0")
peerStore.hasPeer(p5, "/vac/waku/store/2.0.0-beta2")
not peerStore.hasPeer(p5, "another-protocol-not-contained")
# peer 6 is not in the PeerStore
not peerStore.hasPeer(p6, "/vac/waku/lightpush/2.0.0")
test "hasPeers() returns true if any peer in the PeerStore supports a given protocol":
# Match specific protocols
check:
peerStore.hasPeers("/vac/waku/relay/2.0.0-beta1")
peerStore.hasPeers("/vac/waku/store/2.0.0")
peerStore.hasPeers("/vac/waku/lightpush/2.0.0")
not peerStore.hasPeers("/vac/waku/does-not-exist/2.0.0")
# Match protocolMatcher protocols
check:
peerStore.hasPeers(protocolMatcher("/vac/waku/store/2.0.0"))
not peerStore.hasPeers(protocolMatcher("/vac/waku/does-not-exist/2.0.0"))
test "selectPeer() returns if a peer supports a given protocol":
# When
let swapPeer = peerStore.selectPeer("/vac/waku/swap/2.0.0")
# Then
check:
swapPeer.isSome()
swapPeer.get().peerId == p5
swapPeer.get().protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

View File

@ -88,11 +88,11 @@ procSuite "Waku DNS Discovery":
check:
# We have successfully connected to all discovered nodes
node4.peerManager.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId)
node4.peerManager.connectedness(node1.switch.peerInfo.peerId) == Connected
node4.peerManager.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId)
node4.peerManager.connectedness(node2.switch.peerInfo.peerId) == Connected
node4.peerManager.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId)
node4.peerManager.connectedness(node3.switch.peerInfo.peerId) == Connected
node4.peerManager.peerStore.peers().anyIt(it.peerId == node1.switch.peerInfo.peerId)
node4.peerManager.peerStore.connectedness(node1.switch.peerInfo.peerId) == Connected
node4.peerManager.peerStore.peers().anyIt(it.peerId == node2.switch.peerInfo.peerId)
node4.peerManager.peerStore.connectedness(node2.switch.peerInfo.peerId) == Connected
node4.peerManager.peerStore.peers().anyIt(it.peerId == node3.switch.peerInfo.peerId)
node4.peerManager.peerStore.connectedness(node3.switch.peerInfo.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()])

View File

@ -101,7 +101,7 @@ proc main(): Future[int] {.async.} =
# ensure input protocols are valid
for p in conf.protocols:
if p notin ProtocolsTable:
if p notin ProtocolsTable:
error "invalid protocol", protocol=p, valid=ProtocolsTable
raise newException(ConfigurationError, "Invalid cli flag values" & p)
@ -128,7 +128,7 @@ proc main(): Future[int] {.async.} =
error "Timedout after", timeout=conf.timeout
let lp2pPeerStore = node.switch.peerStore
let conStatus = node.peerManager.peerStore.connectionBook[peer.peerId]
let conStatus = node.peerManager.peerStore[ConnectionBook][peer.peerId]
if conStatus in [Connected, CanConnect]:
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.peerId]

View File

@ -39,7 +39,7 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Admin API version 1 definitions
rpcsrv.rpc("post_waku_v2_admin_v1_peers") do(peers: seq[string]) -> bool:
## Connect to a list of peers
debug "post_waku_v2_admin_v1_peers"
@ -64,34 +64,38 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
if not node.wakuRelay.isNil:
# Map managed peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuRelayCodec)
wPeers.insert(node.peerManager.peerStore
.peers(WakuRelayCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuRelayCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
if not node.wakuFilter.isNil:
# Map WakuFilter peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuFilterCodec)
wPeers.insert(node.peerManager.peerStore
.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuFilterCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
if not node.wakuSwap.isNil:
# Map WakuSwap peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuSwapCodec)
wPeers.insert(node.peerManager.peerStore
.peers(WakuSwapCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuSwapCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
if not node.wakuStore.isNil:
# Map WakuStore peers to WakuPeers and add to return list
wPeers.insert(node.peerManager.peers(WakuStoreCodec)
wPeers.insert(node.peerManager.peerStore
.peers(WakuStoreCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId),
protocol: WakuStoreCodec,
connected: node.peerManager.connectedness(it.peerId) == Connectedness.Connected)),
connected: it.connectedness == Connectedness.Connected)),
wPeers.len) # Append to the end of the sequence
# @TODO filter output on protocol/connected-status

View File

@ -30,7 +30,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers")

View File

@ -6,8 +6,8 @@ else:
import
std/[options, sets, sequtils, times],
chronos,
chronicles,
chronos,
chronicles,
metrics,
libp2p/multistream
import
@ -29,7 +29,7 @@ logScope:
type
PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: WakuPeerStore
peerStore*: PeerStore
storage: PeerStorage
let
@ -39,11 +39,6 @@ let
# Helper functions #
####################
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
RemotePeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
proc insertOrReplace(ps: PeerStorage,
peerId: PeerID,
storedInfo: StoredInfo,
@ -55,7 +50,7 @@ proc insertOrReplace(ps: PeerStorage,
warn "failed to store peers", err = res.error
waku_peers_errors.inc(labelValues = ["storage_failure"])
proc dialPeer(pm: PeerManager, peerId: PeerID,
proc dialPeer(pm: PeerManager, peerId: PeerID,
addrs: seq[MultiAddress], proto: string,
dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
info "Dialing peer from manager", wireAddr = addrs, peerId = peerId
@ -73,20 +68,20 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
debug "Dialing remote peer timed out"
waku_peers_dials.inc(labelValues = ["timeout"])
pm.peerStore.connectionBook[peerId] = CannotConnect
pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
return none(Connection)
except CatchableError as e:
# TODO: any redial attempts?
debug "Dialing remote peer failed", msg = e.msg
waku_peers_dials.inc(labelValues = ["failed"])
pm.peerStore.connectionBook[peerId] = CannotConnect
pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
return none(Connection)
proc loadFromStorage(pm: PeerManager) =
@ -99,49 +94,55 @@ proc loadFromStorage(pm: PeerManager) =
# Do not manage self
return
pm.peerStore.addressBook[peerId] = storedInfo.addrs
pm.peerStore.protoBook[peerId] = storedInfo.protos
pm.peerStore.keyBook[peerId] = storedInfo.publicKey
pm.peerStore.connectionBook[peerId] = NotConnected # Reset connectedness state
pm.peerStore.disconnectBook[peerId] = disconnectTime
# nim-libp2p books
pm.peerStore[AddressBook][peerId] = storedInfo.addrs
pm.peerStore[ProtoBook][peerId] = storedInfo.protos
pm.peerStore[KeyBook][peerId] = storedInfo.publicKey
pm.peerStore[AgentBook][peerId] = storedInfo.agent
pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion
# custom books
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
pm.peerStore[DisconnectBook][peerId] = disconnectTime
pm.peerStore[SourceBook][peerId] = storedInfo.origin
let res = pm.storage.getAll(onData)
if res.isErr:
warn "failed to load peers from storage", err = res.error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
else:
debug "successfully queried peer storage"
##################
# Initialisation #
##################
##################
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
if not pm.peerStore.addressBook.contains(peerId):
if not pm.peerStore[AddressBook].contains(peerId):
## We only consider connection events if we
## already track some addresses for this peer
return
case event.kind
of ConnEventKind.Connected:
pm.peerStore.connectionBook[peerId] = Connected
pm.peerStore[ConnectionBook][peerId] = Connected
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), Connected)
return
of ConnEventKind.Disconnected:
pm.peerStore.connectionBook[peerId] = CanConnect
pm.peerStore[ConnectionBook][peerId] = CanConnect
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
return
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
let pm = PeerManager(switch: switch,
peerStore: WakuPeerStore.new(),
peerStore: switch.peerStore,
storage: storage)
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
@ -150,122 +151,68 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
pm.loadFromStorage() # Load previously managed peers.
else:
debug "no peer storage found"
return pm
#####################
# Manager interface #
#####################
proc peers*(pm: PeerManager): seq[StoredInfo] =
# Return the known info for all peers
pm.peerStore.peers()
proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
# Return the known info for all peers registered on the specified protocol
pm.peers.filterIt(it.protos.contains(proto))
proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
# Return the known info for all peers matching the provided protocolMatcher
pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc connectedness*(pm: PeerManager, peerId: PeerID): Connectedness =
# Return the connection state of the given, managed peer
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
let storedInfo = pm.peerStore.get(peerId)
if (storedInfo == StoredInfo()):
# Peer is not managed, therefore not connected
return NotConnected
else:
pm.peerStore.connectionBook[peerId]
proc hasPeer*(pm: PeerManager, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
pm.peerStore.get(peerId).protos.contains(proto)
proc hasPeers*(pm: PeerManager, proto: string): bool =
# Returns `true` if manager has any peers for the specified protocol
pm.peers.anyIt(it.protos.contains(proto))
proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool =
# Returns `true` if manager has any peers matching the protocolMatcher
pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) =
# Adds peer to manager for the specified protocol
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to manage our unmanageable self
return
debug "Adding peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
# ...known addresses
for multiaddr in remotePeerInfo.addrs:
pm.peerStore.addressBook.add(remotePeerInfo.peerId, multiaddr)
pm.peerStore[AddressBook][remotePeerInfo.peerId] = pm.peerStore[AddressBook][remotePeerInfo.peerId] & multiaddr
# ...public key
var publicKey: PublicKey
discard remotePeerInfo.peerId.extractPublicKey(publicKey)
pm.peerStore.keyBook[remotePeerInfo.peerId] = publicKey
pm.peerStore[KeyBook][remotePeerInfo.peerId] = publicKey
# ...associated protocols
pm.peerStore.protoBook.add(remotePeerInfo.peerId, proto)
# nim-libp2p identify overrides this
pm.peerStore[ProtoBook][remotePeerInfo.peerId] = pm.peerStore[ProtoBook][remotePeerInfo.peerId] & proto
# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, pm.peerStore.get(remotePeerInfo.peerId), NotConnected)
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
let peers = pm.peers.filterIt(it.protos.contains(proto))
if peers.len >= 1:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toRemotePeerInfo())
else:
return none(RemotePeerInfo)
proc reconnectPeers*(pm: PeerManager,
proto: string,
protocolMatcher: Matcher,
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
debug "Reconnecting peers", proto=proto
for storedInfo in pm.peers(protocolMatcher):
# Check if peer is reachable.
if pm.peerStore.connectionBook[storedInfo.peerId] == CannotConnect:
debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId
for storedInfo in pm.peerStore.peers(protocolMatcher):
# Check that the peer can be connected
if storedInfo.connectedness == CannotConnect:
debug "Not reconnecting to unreachable or non-existing peer", peerId=storedInfo.peerId
continue
# Respect optional backoff period where applicable.
let
disconnectTime = Moment.init(pm.peerStore.disconnectBook[storedInfo.peerId], Second) # Convert
# TODO: Add method to peerStore (eg isBackoffExpired())
disconnectTime = Moment.init(storedInfo.disconnectTime, Second) # Convert
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime
# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
if backoffTime > ZeroDuration:
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
# Add to protos for peer, if it has not been added yet
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
let remotePeerInfo = storedInfo.toRemotePeerInfo()
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo, proto)
trace "Reconnecting to peer", peerId=storedInfo.peerId
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
@ -277,12 +224,12 @@ proc reconnectPeers*(pm: PeerManager,
proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
# First add dialed peer info to peer store, if it does not exist yet...
if not pm.hasPeer(remotePeerInfo.peerId, proto):
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
pm.addPeer(remotePeerInfo, proto)
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self
return none(Connection)
@ -292,7 +239,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d
proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
if peerId == pm.switch.peerInfo.peerId:
# Do not attempt to dial self
return none(Connection)
@ -304,10 +251,10 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = def
proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "Connecting to node", remotePeer = remotePeer, source = source
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
let connOpt = await pm.dialPeer(remotePeer, proto)
if connOpt.isSome():
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
waku_node_conns_initiated.inc(labelValues = [source])
@ -318,7 +265,7 @@ proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, s
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len
for nodeId in nodes:
await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source)
@ -333,7 +280,7 @@ proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source
proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len
for remotePeerInfo in nodes:
await connectToNode(pm, remotePeerInfo, proto, source)

View File

@ -4,12 +4,17 @@ else:
{.push raises: [].}
import
std/[tables, sequtils, sets],
std/[tables, sequtils, sets, options],
libp2p/builders,
libp2p/peerstore
import
../../utils/peers
export peerstore, builders
# TODO rename to peer_store_extended to emphasize its a nimlibp2 extension
type
Connectedness* = enum
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
@ -20,74 +25,107 @@ type
CanConnect,
# Connected: actively connected to peer.
Connected
PeerOrigin* = enum
Unknown,
Discv5,
Static,
Dns
# Keeps track of the Connectedness state of a peer
ConnectionBook* = ref object of PeerBook[Connectedness]
DisconnectBook* = ref object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps
# Keeps track of when peers were disconnected in Unix timestamps
DisconnectBook* = ref object of PeerBook[int64]
# Keeps track of the origin of a peer
SourceBook* = ref object of PeerBook[PeerOrigin]
WakuPeerStore* = ref object
addressBook*: AddressBook
protoBook*: ProtoBook
keyBook*: KeyBook
connectionBook*: ConnectionBook
disconnectBook*: DisconnectBook
StoredInfo* = object
# Collates stored info about a peer
peerId*: PeerID
# Taken from nim-libp2
peerId*: PeerId
addrs*: seq[MultiAddress]
protos*: seq[string]
publicKey*: PublicKey
agent*: string
protoVersion*: string
proc new*(T: type WakuPeerStore): WakuPeerStore =
let
addressBook = AddressBook(book: initTable[PeerID, seq[MultiAddress]]())
protoBook = ProtoBook(book: initTable[PeerID, seq[string]]())
keyBook = KeyBook(book: initTable[PeerID, PublicKey]())
connectionBook = ConnectionBook(book: initTable[PeerID, Connectedness]())
disconnectBook = DisconnectBook(book: initTable[PeerID, int64]())
T(addressBook: addressBook,
protoBook: protoBook,
keyBook: keyBook,
connectionBook: connectionBook,
disconnectBook: disconnectBook)
# Extended custom fields
connectedness*: Connectedness
disconnectTime*: int64
origin*: PeerOrigin
#####################
# Utility functions #
#####################
proc add*[T](peerBook: SeqPeerBook[T],
peerId: PeerId,
entry: T) =
## Add entry to a given peer. If the peer is not known,
## it will be set with the provided entry.
peerBook.book.mgetOrPut(peerId,
newSeq[T]()).add(entry)
# TODO: Notify clients?
##################
##################
# Peer Store API #
##################
proc get*(peerStore: WakuPeerStore,
proc get*(peerStore: PeerStore,
peerId: PeerID): StoredInfo =
## Get the stored information of a given peer.
StoredInfo(
# Taken from nim-libp2
peerId: peerId,
addrs: peerStore.addressBook[peerId],
protos: peerStore.protoBook[peerId],
publicKey: peerStore.keyBook[peerId]
addrs: peerStore[AddressBook][peerId],
protos: peerStore[ProtoBook][peerId],
publicKey: peerStore[KeyBook][peerId],
agent: peerStore[AgentBook][peerId],
protoVersion: peerStore[ProtoVersionBook][peerId],
# Extended custom fields
connectedness: peerStore[ConnectionBook][peerId],
disconnectTime: peerStore[DisconnectBook][peerId],
origin: peerStore[SourceBook][peerId],
)
proc peers*(peerStore: WakuPeerStore): seq[StoredInfo] =
proc peers*(peerStore: PeerStore): seq[StoredInfo] =
## Get all the stored information of every peer.
let allKeys = concat(toSeq(keys(peerStore.addressBook.book)),
toSeq(keys(peerStore.protoBook.book)),
toSeq(keys(peerStore.keyBook.book))).toHashSet()
let allKeys = concat(toSeq(peerStore[AddressBook].book.keys()),
toSeq(peerStore[ProtoBook].book.keys()),
toSeq(peerStore[KeyBook].book.keys())).toHashSet()
return allKeys.mapIt(peerStore.get(it))
proc peers*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
# Return the known info for all peers registered on the specified protocol
peerStore.peers.filterIt(it.protos.contains(proto))
proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[StoredInfo] =
# Return the known info for all peers matching the provided protocolMatcher
peerStore.peers.filterIt(it.protos.anyIt(protocolMatcher(it)))
proc toRemotePeerInfo*(storedInfo: StoredInfo): RemotePeerInfo =
RemotePeerInfo.init(peerId = storedInfo.peerId,
addrs = toSeq(storedInfo.addrs),
protocols = toSeq(storedInfo.protos))
proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
# Return the connection state of the given, managed peer
# TODO: the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
# TODO: richer return than just bool, e.g. add enum "CanConnect", "CannotConnect", etc. based on recent connection attempts
return peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
# TODO: What if peer does not exist in the peerStore?
peerStore.get(peerId).protos.contains(proto)
proc hasPeers*(peerStore: PeerStore, proto: string): bool =
# Returns `true` if the peerstore has any peer for the specified protocol
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto))
proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
# Returns `true` if the peerstore has any peer matching the protocolMatcher
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))
proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
let peers = peerStore.peers().filterIt(it.protos.contains(proto))
if peers.len >= 1:
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
let peerStored = peers[0]
return some(peerStored.toRemotePeerInfo())
else:
return none(RemotePeerInfo)

View File

@ -102,7 +102,7 @@ type
started*: bool # Indicates that node has started listening
proc protocolMatcher(codec: string): Matcher =
proc protocolMatcher*(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} =
## Matches a proto with any postfix to the provided codec.
@ -146,7 +146,8 @@ proc new*(T: type WakuNode,
sendSignedPeerRecord = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string), # defaults to nim-libp2p version
agentString = none(string), # defaults to nim-libp2p version
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
## Creates a Waku Node instance.
@ -217,7 +218,8 @@ proc new*(T: type WakuNode,
secureCertPath = secureCert,
nameResolver = nameResolver,
sendSignedPeerRecord = sendSignedPeerRecord,
agentString = agentString
agentString = agentString,
peerStoreCapacity = peerStoreCapacity,
)
let wakuNode = WakuNode(
@ -357,7 +359,7 @@ proc startRelay*(node: WakuNode) {.async.} =
node.subscribe(topic, none(TopicHandler))
# Resume previous relay connections
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
@ -502,7 +504,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
@ -517,7 +519,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
let peerOpt = node.peerManager.peerStore.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
@ -675,7 +677,7 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
if node.wakuStoreClient.isNil():
return err("waku store client is nil")
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
let peerOpt = node.peerManager.peerStore.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
return err("peer_not_found_failure")
@ -764,7 +766,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
error "failed to publish message", error="waku lightpush client is nil"
return
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
let peerOpt = node.peerManager.peerStore.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
error "failed to publish message", error="no suitable remote peers"
return
@ -824,14 +826,15 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.libp2pPing)
# TODO: Move this logic to PeerManager
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started:
# Keep all connected peers alive while running
trace "Running keepalive"
# First get a list of connected peer infos
let peers = node.peerManager.peers()
.filterIt(node.peerManager.connectedness(it.peerId) == Connected)
let peers = node.peerManager.peerStore.peers()
.filterIt(it.connectedness == Connected)
.mapIt(it.toRemotePeerInfo())
# Attempt to retrieve and ping the active outgoing connection for each peer
@ -855,6 +858,9 @@ proc startKeepalive*(node: WakuNode) =
asyncSpawn node.keepaliveLoop(defaultKeepalive)
# TODO: Decouple discovery logic from connection logic
# A discovered peer goes to the PeerStore
# The PeerManager uses to PeerStore to dial peers
proc runDiscv5Loop(node: WakuNode) {.async.} =
## Continuously add newly discovered nodes
## using Node Discovery v5

View File

@ -43,7 +43,7 @@ proc withWssTransport*(b: SwitchBuilder,
secureKeyPath: string,
secureCertPath: string): SwitchBuilder
{.raises: [Defect, IOError].} =
let key : TLSPrivateKey = getSecureKey(secureKeyPath)
let cert : TLSCertificate = getSecureCert(secureCertPath)
b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr,
@ -71,7 +71,8 @@ proc newWakuSwitch*(
wssEnabled: bool = false,
secureKeyPath: string = "",
secureCertPath: string = "",
agentString = none(string), # defaults to nim-libp2p version
agentString = none(string), # defaults to nim-libp2p version,
peerStoreCapacity = none(int), # defaults to nim-libp2p max size
): Switch
{.raises: [Defect, IOError, LPError].} =
@ -88,6 +89,8 @@ proc newWakuSwitch*(
.withNameResolver(nameResolver)
.withSignedPeerRecord(sendSignedPeerRecord)
if peerStoreCapacity.isSome():
b = b.withPeerStore(peerStoreCapacity.get())
if agentString.isSome():
b = b.withAgentVersion(agentString.get())
if privKey.isSome():
@ -103,4 +106,4 @@ proc newWakuSwitch*(
else :
b = b.withAddress(address)
b.build()
b.build()

View File

@ -77,7 +77,7 @@ proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Fut
return ok()
proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
@ -106,7 +106,7 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo
return ok()
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
let peerOpt = wpx.peerManager.peerStore.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)

View File

@ -210,7 +210,7 @@ when defined(waku_exp_store_resume):
else:
debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
let peerOpt = w.peerManager.peerStore.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
warn "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])