From f0eadfec132b7015da9bbaa146b2fb6e474a8564 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Wed, 21 Apr 2021 11:36:56 +0200 Subject: [PATCH] Add persistent backoff for peers (#497) --- tests/v2/test_peer_storage.nim | 11 +++-- waku/v2/node/config.nim | 5 +++ waku/v2/node/peer_manager/peer_manager.nim | 41 +++++++++++++++---- waku/v2/node/peer_manager/waku_peer_store.nim | 3 ++ waku/v2/node/storage/peer/peer_storage.nim | 5 ++- .../node/storage/peer/waku_peer_storage.nim | 23 +++++++---- waku/v2/node/wakunode2.nim | 34 +++++---------- 7 files changed, 75 insertions(+), 47 deletions(-) diff --git a/tests/v2/test_peer_storage.nim b/tests/v2/test_peer_storage.nim index b43975781..985b4ba88 100644 --- a/tests/v2/test_peer_storage.nim +++ b/tests/v2/test_peer_storage.nim @@ -21,21 +21,23 @@ suite "Peer Storage": peerProto = "/waku/2/default-waku/codec" stored = StoredInfo(peerId: peer.peerId, addrs: toHashSet([peerLoc]), protos: toHashSet([peerProto]), publicKey: peerKey.getKey().tryGet()) conn = Connectedness.CanConnect + disconn = 999999 defer: storage.close() # Test insert and retrieve - discard storage.put(peer.peerId, stored, conn) + discard storage.put(peer.peerId, stored, conn, disconn) var responseCount = 0 proc data(peerId: PeerID, storedInfo: StoredInfo, - connectedness: Connectedness) = + connectedness: Connectedness, disconnectTime: int64) = responseCount += 1 check: peerId == peer.peerId storedInfo == stored connectedness == conn + disconnectTime == disconn let res = storage.getAll(data) @@ -44,16 +46,17 @@ suite "Peer Storage": responseCount == 1 # Test replace and retrieve (update an existing entry) - discard storage.put(peer.peerId, stored, Connectedness.CannotConnect) + discard storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10) responseCount = 0 proc replacedData(peerId: PeerID, storedInfo: StoredInfo, - connectedness: Connectedness) = + connectedness: Connectedness, disconnectTime: int64) = responseCount += 1 check: peerId == peer.peerId storedInfo == stored connectedness == CannotConnect + disconnectTime == disconn + 10 let repRes = storage.getAll(replacedData) diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index 88b4a9735..824008fa6 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -42,6 +42,11 @@ type staticnodes* {. desc: "Peer multiaddr to directly connect with. Argument may be repeated." name: "staticnode" }: seq[string] + + peerpersist* {. + desc: "Enable peer persistence: true|false", + defaultValue: false + name: "peerpersist" }: bool storenode* {. desc: "Peer multiaddr to query for storage.", diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 6afbe6038..c1140d529 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -1,7 +1,7 @@ {.push raises: [Defect, Exception].} import - std/[options, sets, sequtils], + std/[options, sets, sequtils, times], chronos, chronicles, metrics, ./waku_peer_store, ../storage/peer/peer_storage @@ -20,8 +20,8 @@ type peerStore*: WakuPeerStore storage: PeerStorage -const - defaultDialTimeout = 1.minutes # @TODO should this be made configurable? +let + defaultDialTimeout = chronos.minutes(1) # @TODO should this be made configurable? #################### # Helper functions # @@ -32,9 +32,11 @@ proc toPeerInfo(storedInfo: StoredInfo): PeerInfo = addrs = toSeq(storedInfo.addrs), protocols = toSeq(storedInfo.protos)) -proc insertOrReplace(ps: PeerStorage, peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) = +proc insertOrReplace(ps: PeerStorage, + peerId: PeerID, + storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64 = 0) = # Insert peer entry into persistent storage, or replace existing entry with updated info - let res = ps.put(peerId, storedInfo, connectedness) + let res = ps.put(peerId, storedInfo, connectedness, disconnectTime) if res.isErr: warn "failed to store peers", err = res.error waku_peers_errors.inc(labelValues = ["storage_failure"]) @@ -75,7 +77,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, proc loadFromStorage(pm: PeerManager) = # Load peers from storage, if available - proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) = + proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) = if peerId == pm.switch.peerInfo.peerId: # Do not manage self return @@ -84,6 +86,7 @@ proc loadFromStorage(pm: PeerManager) = pm.peerStore.protoBook.set(peerId, storedInfo.protos) pm.peerStore.keyBook.set(peerId, storedInfo.publicKey) pm.peerStore.connectionBook.set(peerId, NotConnected) # Reset connectedness state + pm.peerStore.disconnectBook.set(peerId, disconnectTime) let res = pm.storage.getAll(onData) if res.isErr: @@ -104,7 +107,7 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = of ConnEventKind.Disconnected: pm.peerStore.connectionBook.set(peerId, CanConnect) if not pm.storage.isNil: - pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect) + pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) return proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager = @@ -195,13 +198,31 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] = else: return none(PeerInfo) -proc reconnectPeers*(pm: PeerManager, proto: string) {.async.} = +proc reconnectPeers*(pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)) {.async.} = ## Reconnect to peers registered for this protocol. This will update connectedness. ## Especially useful to resume connections from persistent storage after a restart. debug "Reconnecting peers", proto=proto for storedInfo in pm.peers(proto): + # Check if peer is reachable. + if pm.peerStore.connectionBook.get(storedInfo.peerId) == CannotConnect: + debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId + continue + + # Respect optional backoff period where applicable. + let + disconnectTime = Moment.init(pm.peerStore.disconnectBook.get(storedInfo.peerId), Second) # Convert + currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value + backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect + + trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime + + if backoffTime > ZeroDuration: + debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime + # We disconnected recently and still need to wait for a backoff period before connecting + await sleepAsync(backoffTime) + trace "Reconnecting to peer", peerId=storedInfo.peerId discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) @@ -217,5 +238,9 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = if not pm.hasPeer(peerInfo, proto): trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto pm.addPeer(peerInfo, proto) + + if peerInfo.peerId == pm.switch.peerInfo.peerId: + # Do not attempt to dial self + return none(Connection) return await pm.dialPeer(peerInfo.peerId, peerInfo.addrs, proto, dialTimeout) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index fe05eed3c..f7822d3d2 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -19,8 +19,11 @@ type ConnectionBook* = object of PeerBook[Connectedness] + DisconnectBook* = object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps + WakuPeerStore* = ref object of PeerStore connectionBook*: ConnectionBook + disconnectBook*: DisconnectBook proc new*(T: type WakuPeerStore): WakuPeerStore = var p: WakuPeerStore diff --git a/waku/v2/node/storage/peer/peer_storage.nim b/waku/v2/node/storage/peer/peer_storage.nim index bc9476ed0..8667e938c 100644 --- a/waku/v2/node/storage/peer/peer_storage.nim +++ b/waku/v2/node/storage/peer/peer_storage.nim @@ -12,12 +12,13 @@ type PeerStorageResult*[T] = Result[T, string] DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo, - connectedness: Connectedness) {.closure.} + connectedness: Connectedness, disconnectTime: int64) {.closure.} # PeerStorage interface method put*(db: PeerStorage, peerId: PeerID, storedInfo: StoredInfo, - connectedness: Connectedness): PeerStorageResult[void] {.base.} = discard + connectedness: Connectedness, + disconnectTime: int64): PeerStorageResult[void] {.base.} = discard method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard \ No newline at end of file diff --git a/waku/v2/node/storage/peer/waku_peer_storage.nim b/waku/v2/node/storage/peer/waku_peer_storage.nim index e523cb3c5..43c428f89 100644 --- a/waku/v2/node/storage/peer/waku_peer_storage.nim +++ b/waku/v2/node/storage/peer/waku_peer_storage.nim @@ -56,16 +56,18 @@ proc encode*(storedInfo: StoredInfo): ProtoBuffer = ########################## proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] = - ## Create the "Peers" table + ## Create the "Peer" table ## It contains: ## - peer id as primary key, stored as a blob ## - stored info (serialised protobuf), stored as a blob ## - last known enumerated connectedness state, stored as an integer + ## - disconnect time in epoch seconds, if applicable let prepare = db.prepareStmt(""" - CREATE TABLE IF NOT EXISTS Peers ( + CREATE TABLE IF NOT EXISTS Peer ( peerId BLOB PRIMARY KEY, storedInfo BLOB, - connectedness INTEGER + connectedness INTEGER, + disconnectTime INTEGER ) WITHOUT ROWID; """, NoParams, void) @@ -82,19 +84,20 @@ proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] = method put*(db: WakuPeerStorage, peerId: PeerID, storedInfo: StoredInfo, - connectedness: Connectedness): PeerStorageResult[void] = + connectedness: Connectedness, + disconnectTime: int64): PeerStorageResult[void] = ## Adds a peer to storage or replaces existing entry if it already exists let prepare = db.database.prepareStmt( - "REPLACE INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);", - (seq[byte], seq[byte], int32), + "REPLACE INTO Peer (peerId, storedInfo, connectedness, disconnectTime) VALUES (?, ?, ?, ?);", + (seq[byte], seq[byte], int32, int64), void ) if prepare.isErr: return err("failed to prepare") - let res = prepare.value.exec((peerId.data, storedInfo.encode().buffer, int32(ord(connectedness)))) + let res = prepare.value.exec((peerId.data, storedInfo.encode().buffer, int32(ord(connectedness)), disconnectTime)) if res.isErr: return err("failed") @@ -117,10 +120,12 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR storedInfo = StoredInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet() # Connectedness connectedness = Connectedness(sqlite3_column_int(s, 2)) + # DisconnectTime + disconnectTime = sqlite3_column_int64(s, 3) - onData(peerId, storedInfo, connectedness) + onData(peerId, storedInfo, connectedness, disconnectTime) - let res = db.database.query("SELECT peerId, storedInfo, connectedness FROM Peers", peer) + let res = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer) if res.isErr: return err("failed") diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 20ed6cb8e..11cd0a094 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -10,6 +10,7 @@ import # NOTE For TopicHandler, solve with exports? libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/gossipsub, libp2p/standard_setup, ../protocol/[waku_relay, waku_message, message_notifier], ../protocol/waku_store/waku_store, @@ -419,31 +420,16 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela info "mounting relay" + node.subscribe(defaultTopic, none(TopicHandler)) + + for topic in topics: + node.subscribe(topic, none(TopicHandler)) + if node.peerManager.hasPeers(WakuRelayCodec): trace "Found previous WakuRelay peers. Reconnecting." - # Reconnect to previous relay peers - waitFor node.peerManager.reconnectPeers(WakuRelayCodec) - - ## GossipSub specifies a backoff period after disconnecting and unsubscribing before attempting - ## to re-graft peer on previous topics. We have to respect this period before starting WakuRelay. - trace "Backing off before grafting after reconnecting to WakuRelay peers", backoff=wakuRelay.parameters.pruneBackoff - - proc subscribeFuture() {.async.} = - # Subscribe after the backoff period - await sleepAsync(wakuRelay.parameters.pruneBackoff) - - node.subscribe(defaultTopic, none(TopicHandler)) - - for topic in topics: - node.subscribe(topic, none(TopicHandler)) - - discard subscribeFuture() # Dispatch future, but do not await. - else: - # Subscribe immediately - node.subscribe(defaultTopic, none(TopicHandler)) - - for topic in topics: - node.subscribe(topic, none(TopicHandler)) + # Reconnect to previous relay peers. This will respect a backoff period, if necessary + waitFor node.peerManager.reconnectPeers(WakuRelayCodec, + wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)) if rlnRelayEnabled: # TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc @@ -595,7 +581,7 @@ when isMainModule: var pStorage: WakuPeerStorage - if not sqliteDatabase.isNil: + if conf.peerpersist and not sqliteDatabase.isNil: let res = WakuPeerStorage.new(sqliteDatabase) if res.isErr: warn "failed to init new WakuPeerStorage", err = res.error