Patch v0.5.1 - fix multiple protocol IDs in persistent storage (#687)

This commit is contained in:
Hanno Cornelius 2021-07-27 08:48:56 +02:00 committed by GitHub
parent b4ee7071ba
commit 6b1e0c3079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 6 deletions

View File

@ -1,5 +1,24 @@
# Changelog
## 2021-07-26 v0.5.1
This patch release contains the following fix:
- Support for multiple protocol IDs when reconnecting to previously connected peers:
A bug in `v0.5` caused clients using persistent peer storage to only support the mounted protocol ID.
This is a patch release that is fully backwards-compatible with release `v0.5`.
It supports the same [libp2p protocols](https://docs.libp2p.io/concepts/protocols/):
| Protocol | Spec status | Protocol id |
| ---: | :---: | :--- |
| [`17/WAKU-RLN`](https://rfc.vac.dev/spec/17/) | `raw` | `/vac/waku/waku-rln-relay/2.0.0-alpha1` |
| [`11/WAKU2-RELAY`](https://rfc.vac.dev/spec/11/) | `stable` | `/vac/waku/relay/2.0.0` |
| [`12/WAKU2-FILTER`](https://rfc.vac.dev/spec/12/) | `draft` | `/vac/waku/filter/2.0.0-beta1` |
| [`13/WAKU2-STORE`](https://rfc.vac.dev/spec/13/) | `draft` | `/vac/waku/store/2.0.0-beta3` |
| [`18/WAKU2-SWAP`](https://rfc.vac.dev/spec/18/) | `draft` | `/vac/waku/swap/2.0.0-beta1` |
| [`19/WAKU2-LIGHTPUSH`](https://rfc.vac.dev/spec/19/) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` |
The Waku v1 implementation is stable but not under active development.
## 2021-07-23 v0.5
This release contains the following:

View File

@ -2,6 +2,7 @@
import
std/[options, sets, tables, sequtils],
chronicles,
testutils/unittests, stew/shims/net as stewNet,
json_rpc/[rpcserver, rpcclient],
eth/[keys, rlp], eth/common/eth_types,
@ -164,7 +165,7 @@ procSuite "Peer Manager":
asyncTest "Peer manager can use persistent storage and survive restarts":
let
database = SqliteDatabase.init("", inMemory = true)[]
database = SqliteDatabase.init("1", inMemory = true)[]
storage = WakuPeerStorage.new(database)[]
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
@ -209,3 +210,63 @@ procSuite "Peer Manager":
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
database = SqliteDatabase.init("2", inMemory = true)[]
storage = WakuPeerStorage.new(database)[]
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000), peerStorage = storage)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
peerInfo2 = node2.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0"
await node1.start()
await node2.start()
node1.mountRelay()
node1.wakuRelay.codec = betaCodec
node2.mountRelay()
node2.wakuRelay.codec = betaCodec
discard await node1.peerManager.dialPeer(peerInfo2, node2.wakuRelay.codec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.peers().len == 1
node1.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
node1.peerManager.connectedness(peerInfo2.peerId) == Connected
# Simulate restart by initialising a new node using the same storage
let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60004), peerStorage = storage)
node3.mountRelay()
node3.wakuRelay.codec = stableCodec
check:
# Node 2 and 3 have differing codecs
node2.wakuRelay.codec == betaCodec
node3.wakuRelay.codec == stableCodec
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.connectedness(peerInfo2.peerId) == NotConnected
await node3.start() # This should trigger a reconnect
check:
# Reconnected to node2 after "restart"
node3.peerManager.peers().len == 1
node3.peerManager.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peers().anyIt(it.protos.contains(stableCodec))
node3.peerManager.connectedness(peerInfo2.peerId) == Connected
await allFutures([node1.stop(), node2.stop(), node3.stop()])

View File

@ -3,6 +3,7 @@
import
std/[options, sets, sequtils, times],
chronos, chronicles, metrics,
libp2p/multistream,
./waku_peer_store,
../storage/peer/peer_storage
@ -78,8 +79,11 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
return none(Connection)
proc loadFromStorage(pm: PeerManager) =
debug "loading peers from storage"
# Load peers from storage, if available
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
trace "loading peer", peerId=peerId, storedInfo=storedInfo, connectedness=connectedness
if peerId == pm.switch.peerInfo.peerId:
# Do not manage self
return
@ -94,6 +98,8 @@ proc loadFromStorage(pm: PeerManager) =
if res.isErr:
warn "failed to load peers from storage", err = res.error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
else:
debug "successfully queried peer storage"
##################
# Initialisation #
@ -117,6 +123,8 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
peerStore: WakuPeerStore.new(),
storage: storage)
debug "creating new PeerManager"
proc peerHook(peerInfo: PeerInfo, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerInfo.peerId, event)
@ -124,7 +132,10 @@ proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): Peer
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
if not storage.isNil:
debug "found persistent peer storage"
pm.loadFromStorage() # Load previously managed peers.
else:
debug "no peer storage found"
return pm
@ -140,6 +151,10 @@ proc peers*(pm: PeerManager, proto: string): seq[StoredInfo] =
# Return the known info for all peers registered on the specified protocol
pm.peers.filterIt(it.protos.contains(proto))
proc peers*(pm: PeerManager, protocolMatcher: Matcher): seq[StoredInfo] =
# Return the known info for all peers matching the provided protocolMatcher
pm.peers.filter(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc connectedness*(pm: PeerManager, peerId: PeerId): Connectedness =
# Return the connection state of the given, managed peer
# @TODO the PeerManager should keep and update local connectedness state for peers, redial on disconnect, etc.
@ -162,6 +177,10 @@ proc hasPeers*(pm: PeerManager, proto: string): bool =
# Returns `true` if manager has any peers for the specified protocol
pm.peers.anyIt(it.protos.contains(proto))
proc hasPeers*(pm: PeerManager, protocolMatcher: Matcher): bool =
# Returns `true` if manager has any peers matching the protocolMatcher
pm.peers.any(proc (storedInfo: StoredInfo): bool = storedInfo.protos.anyIt(protocolMatcher(it)))
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
# Adds peer to manager for the specified protocol
@ -200,13 +219,16 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
else:
return none(PeerInfo)
proc reconnectPeers*(pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
proc reconnectPeers*(pm: PeerManager,
proto: string,
protocolMatcher: Matcher,
backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.
debug "Reconnecting peers", proto=proto
for storedInfo in pm.peers(proto):
for storedInfo in pm.peers(protocolMatcher):
# Check if peer is reachable.
if pm.peerStore.connectionBook.get(storedInfo.peerId) == CannotConnect:
debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId
@ -225,6 +247,12 @@ proc reconnectPeers*(pm: PeerManager, proto: string, backoff: chronos.Duration =
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)
# Add to protos for peer, if it has not been added yet
if not pm.peerStore.get(storedInfo.peerId).protos.contains(proto):
let peerInfo = storedInfo.toPeerInfo()
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
pm.addPeer(peerInfo, proto)
trace "Reconnecting to peer", peerId=storedInfo.peerId
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)

View File

@ -80,15 +80,16 @@ type
rng*: ref BrHmacDrbgContext
started*: bool # Indicates that node has started listening
func protocolMatcher(codec: string): Matcher =
proc protocolMatcher(codec: string): Matcher =
## Returns a protocol matcher function for the provided codec
proc match(proto: string): bool {.gcsafe.} =
## Matches a proto with any postfix to the provided codec.
## E.g. if the codec is `/vac/waku/filter/2.0.0` it matches the protos:
## `/vac/waku/filter/2.0.0`, `/vac/waku/filter/2.0.0-beta3`, `/vac/waku/filter/2.0.0-actualnonsense`
return proto.startsWith(codec)
return match
proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} =
# Flatten all unsubscribe topics into single seq
let unsubscribeTopics = contentFilters.mapIt(it.contentTopic)
@ -461,13 +462,14 @@ proc startRelay*(node: WakuNode) {.async.} =
node.subscribe(topic, none(TopicHandler))
# Resume previous relay connections
if node.peerManager.hasPeers(WakuRelayCodec):
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec,
protocolMatcher(WakuRelayCodec),
backoffPeriod)
when defined(rln):