From 6b1e0c30791c9bd7614978f18c450564e794ca0f Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Tue, 27 Jul 2021 08:48:56 +0200 Subject: [PATCH] Patch v0.5.1 - fix multiple protocol IDs in persistent storage (#687) --- CHANGELOG.md | 19 +++++++ tests/v2/test_peer_manager.nim | 63 +++++++++++++++++++++- waku/v2/node/peer_manager/peer_manager.nim | 32 ++++++++++- waku/v2/node/wakunode2.nim | 8 +-- 4 files changed, 116 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70fa8ef51..dba9788dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,24 @@ # Changelog +## 2021-07-26 v0.5.1 + +This patch release contains the following fix: +- Support for multiple protocol IDs when reconnecting to previously connected peers: +A bug in `v0.5` caused clients using persistent peer storage to only support the mounted protocol ID. + +This is a patch release that is fully backwards-compatible with release `v0.5`. +It supports the same [libp2p protocols](https://docs.libp2p.io/concepts/protocols/): +| Protocol | Spec status | Protocol id | +| ---: | :---: | :--- | +| [`17/WAKU-RLN`](https://rfc.vac.dev/spec/17/) | `raw` | `/vac/waku/waku-rln-relay/2.0.0-alpha1` | +| [`11/WAKU2-RELAY`](https://rfc.vac.dev/spec/11/) | `stable` | `/vac/waku/relay/2.0.0` | +| [`12/WAKU2-FILTER`](https://rfc.vac.dev/spec/12/) | `draft` | `/vac/waku/filter/2.0.0-beta1` | +| [`13/WAKU2-STORE`](https://rfc.vac.dev/spec/13/) | `draft` | `/vac/waku/store/2.0.0-beta3` | +| [`18/WAKU2-SWAP`](https://rfc.vac.dev/spec/18/) | `draft` | `/vac/waku/swap/2.0.0-beta1` | +| [`19/WAKU2-LIGHTPUSH`](https://rfc.vac.dev/spec/19/) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` | + +The Waku v1 implementation is stable but not under active development. + ## 2021-07-23 v0.5 This release contains the following: diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 893155d40..5eeb5c864 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -2,6 +2,7 @@ import std/[options, sets, tables, sequtils], + chronicles, testutils/unittests, stew/shims/net as stewNet, json_rpc/[rpcserver, rpcclient], eth/[keys, rlp], eth/common/eth_types, @@ -164,7 +165,7 @@ procSuite "Peer Manager": asyncTest "Peer manager can use persistent storage and survive restarts": let - database = SqliteDatabase.init("", inMemory = true)[] + database = SqliteDatabase.init("1", inMemory = true)[] storage = WakuPeerStorage.new(database)[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), @@ -209,3 +210,63 @@ procSuite "Peer Manager": node3.peerManager.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) + +asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers": + let + database = SqliteDatabase.init("2", inMemory = true)[] + storage = WakuPeerStorage.new(database)[] + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000), peerStorage = storage) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + peerInfo2 = node2.peerInfo + betaCodec = "/vac/waku/relay/2.0.0-beta2" + stableCodec = "/vac/waku/relay/2.0.0" + + await node1.start() + await node2.start() + + node1.mountRelay() + node1.wakuRelay.codec = betaCodec + node2.mountRelay() + node2.wakuRelay.codec = betaCodec + + discard await node1.peerManager.dialPeer(peerInfo2, node2.wakuRelay.codec, 2.seconds) + check: + # Currently connected to node2 + node1.peerManager.peers().len == 1 + node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec)) + node1.peerManager.connectedness(peerInfo2.peerId) == Connected + + # Simulate restart by initialising a new node using the same storage + let + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), + Port(60004), peerStorage = storage) + + node3.mountRelay() + node3.wakuRelay.codec = stableCodec + check: + # Node 2 and 3 have differing codecs + node2.wakuRelay.codec == betaCodec + node3.wakuRelay.codec == stableCodec + # Node2 has been loaded after "restart", but we have not yet reconnected + node3.peerManager.peers().len == 1 + node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.peers().anyIt(it.protos.contains(betaCodec)) + node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected + + await node3.start() # This should trigger a reconnect + + check: + # Reconnected to node2 after "restart" + node3.peerManager.peers().len == 1 + node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.peers().anyIt(it.protos.contains(betaCodec)) + node3.peerManager.peers().anyIt(it.protos.contains(stableCodec)) + node3.peerManager.connectedness(peerInfo2.peerId) == Connected + + await allFutures([node1.stop(), node2.stop(), node3.stop()]) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 05933eb91..594284920 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -3,6 +3,7 @@ import std/[options, sets, sequtils, times], chronos, chronicles, metrics, + libp2p/multistream, ./waku_peer_store, ../storage/peer/peer_storage @@ -78,8 +79,11 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, return none(Connection) proc loadFromStorage(pm: PeerManager) = + debug "loading peers from storage" # Load peers from storage, if available proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) = + trace "loading peer", peerId=peerId, storedInfo=storedInfo, connectedness=connectedness + if peerId == pm.switch.peerInfo.peerId: # Do not manage self return @@ -94,6 +98,8 @@ proc loadFromStorage(pm: PeerManager) = if res.isErr: warn "failed to load peers from storage", err = res.error waku_peers_errors.inc(labelValues = ["storage_load_failure"]) + else: + debug "successfully queried peer storage" ################## # Initialisation # @@ -116,6 +122,8 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer let pm = PeerManager(switch: switch, peerStore: WakuPeerStore.new(), storage: storage) + + debug "creating new PeerManager" proc peerHook(peerInfo: PeerInfo, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(pm, peerInfo.peerId, event) @@ -124,7 +132,10 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) if not storage.isNil: + debug "found persistent peer storage" pm.loadFromStorage() # Load previously managed peers. + else: + debug "no peer storage found" return pm @@ -140,6 +151,10 @@ proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] = # Return the known info for all peers registered on the specified protocol pm.peers.filterIt(it.protos.contains(proto)) +proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] = + # Return the known info for all peers matching the provided protocolMatcher + pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it))) + proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness = # Return the connection state of the given, managed peer # @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc. @@ -162,6 +177,10 @@ proc hasPeers*(pm: PeerManager, proto: string): bool = # Returns `true` if manager has any peers for the specified protocol pm.peers.anyIt(it.protos.contains(proto)) +proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool = + # Returns `true` if manager has any peers matching the protocolMatcher + pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it))) + proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) = # Adds peer to manager for the specified protocol @@ -200,13 +219,16 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] = else: return none(PeerInfo) -proc reconnectPeers*(pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)) {.async.} = +proc reconnectPeers*(pm: PeerManager, + proto: string, + protocolMatcher: Matcher, + backoff: chronos.Duration = chronos.seconds(0)) {.async.} = ## Reconnect to peers registered for this protocol. This will update connectedness. ## Especially useful to resume connections from persistent storage after a restart. debug "Reconnecting peers", proto=proto - for storedInfo in pm.peers(proto): + for storedInfo in pm.peers(protocolMatcher): # Check if peer is reachable. if pm.peerStore.connectionBook.get(storedInfo.peerId) == CannotConnect: debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId @@ -224,6 +246,12 @@ proc reconnectPeers*(pm: PeerManager, proto: string, backoff: chronos.Duration = debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime # We disconnected recently and still need to wait for a backoff period before connecting await sleepAsync(backoffTime) + + # Add to protos for peer, if it has not been added yet + if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto): + let peerInfo = storedInfo.toPeerInfo() + trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto + pm.addPeer(peerInfo, proto) trace "Reconnecting to peer", peerId=storedInfo.peerId discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index b160bd56a..5262a44aa 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -80,15 +80,16 @@ type rng*: ref BrHmacDrbgContext started*: bool # Indicates that node has started listening -func protocolMatcher(codec: string): Matcher = +proc protocolMatcher(codec: string): Matcher = ## Returns a protocol matcher function for the provided codec - proc match(proto: string): bool {.gcsafe.} = ## Matches a proto with any postfix to the provided codec. ## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos: ## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense` return proto.startsWith(codec) + return match + proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} = # Flatten all unsubscribe topics into single seq let unsubscribeTopics = contentFilters.mapIt(it.contentTopic) @@ -461,13 +462,14 @@ proc startRelay*(node: WakuNode) {.async.} = node.subscribe(topic, none(TopicHandler)) # Resume previous relay connections - if node.peerManager.hasPeers(WakuRelayCodec): + if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): info "Found previous WakuRelay peers. Reconnecting." # Reconnect to previous relay peers. This will respect a backoff period, if necessary let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) await node.peerManager.reconnectPeers(WakuRelayCodec, + protocolMatcher(WakuRelayCodec), backoffPeriod) when defined(rln):