mirror of https://github.com/waku-org/nwaku.git
Add persistent backoff for peers (#497)
This commit is contained in:
parent
c997860397
commit
f0eadfec13
|
@ -21,21 +21,23 @@ suite "Peer Storage":
|
|||
peerProto = "/waku/2/default-waku/codec"
|
||||
stored = StoredInfo(peerId: peer.peerId, addrs: toHashSet([peerLoc]), protos: toHashSet([peerProto]), publicKey: peerKey.getKey().tryGet())
|
||||
conn = Connectedness.CanConnect
|
||||
disconn = 999999
|
||||
|
||||
defer: storage.close()
|
||||
|
||||
# Test insert and retrieve
|
||||
|
||||
discard storage.put(peer.peerId, stored, conn)
|
||||
discard storage.put(peer.peerId, stored, conn, disconn)
|
||||
|
||||
var responseCount = 0
|
||||
proc data(peerId: PeerID, storedInfo: StoredInfo,
|
||||
connectedness: Connectedness) =
|
||||
connectedness: Connectedness, disconnectTime: int64) =
|
||||
responseCount += 1
|
||||
check:
|
||||
peerId == peer.peerId
|
||||
storedInfo == stored
|
||||
connectedness == conn
|
||||
disconnectTime == disconn
|
||||
|
||||
let res = storage.getAll(data)
|
||||
|
||||
|
@ -44,16 +46,17 @@ suite "Peer Storage":
|
|||
responseCount == 1
|
||||
|
||||
# Test replace and retrieve (update an existing entry)
|
||||
discard storage.put(peer.peerId, stored, Connectedness.CannotConnect)
|
||||
discard storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10)
|
||||
|
||||
responseCount = 0
|
||||
proc replacedData(peerId: PeerID, storedInfo: StoredInfo,
|
||||
connectedness: Connectedness) =
|
||||
connectedness: Connectedness, disconnectTime: int64) =
|
||||
responseCount += 1
|
||||
check:
|
||||
peerId == peer.peerId
|
||||
storedInfo == stored
|
||||
connectedness == CannotConnect
|
||||
disconnectTime == disconn + 10
|
||||
|
||||
let repRes = storage.getAll(replacedData)
|
||||
|
||||
|
|
|
@ -42,6 +42,11 @@ type
|
|||
staticnodes* {.
|
||||
desc: "Peer multiaddr to directly connect with. Argument may be repeated."
|
||||
name: "staticnode" }: seq[string]
|
||||
|
||||
peerpersist* {.
|
||||
desc: "Enable peer persistence: true|false",
|
||||
defaultValue: false
|
||||
name: "peerpersist" }: bool
|
||||
|
||||
storenode* {.
|
||||
desc: "Peer multiaddr to query for storage.",
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
{.push raises: [Defect, Exception].}
|
||||
|
||||
import
|
||||
std/[options, sets, sequtils],
|
||||
std/[options, sets, sequtils, times],
|
||||
chronos, chronicles, metrics,
|
||||
./waku_peer_store,
|
||||
../storage/peer/peer_storage
|
||||
|
@ -20,8 +20,8 @@ type
|
|||
peerStore*: WakuPeerStore
|
||||
storage: PeerStorage
|
||||
|
||||
const
|
||||
defaultDialTimeout = 1.minutes # @TODO should this be made configurable?
|
||||
let
|
||||
defaultDialTimeout = chronos.minutes(1) # @TODO should this be made configurable?
|
||||
|
||||
####################
|
||||
# Helper functions #
|
||||
|
@ -32,9 +32,11 @@ proc toPeerInfo(storedInfo: StoredInfo): PeerInfo =
|
|||
addrs = toSeq(storedInfo.addrs),
|
||||
protocols = toSeq(storedInfo.protos))
|
||||
|
||||
proc insertOrReplace(ps: PeerStorage, peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) =
|
||||
proc insertOrReplace(ps: PeerStorage,
|
||||
peerId: PeerID,
|
||||
storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64 = 0) =
|
||||
# Insert peer entry into persistent storage, or replace existing entry with updated info
|
||||
let res = ps.put(peerId, storedInfo, connectedness)
|
||||
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
|
||||
if res.isErr:
|
||||
warn "failed to store peers", err = res.error
|
||||
waku_peers_errors.inc(labelValues = ["storage_failure"])
|
||||
|
@ -75,7 +77,7 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
|
|||
|
||||
proc loadFromStorage(pm: PeerManager) =
|
||||
# Load peers from storage, if available
|
||||
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) =
|
||||
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
|
||||
if peerId == pm.switch.peerInfo.peerId:
|
||||
# Do not manage self
|
||||
return
|
||||
|
@ -84,6 +86,7 @@ proc loadFromStorage(pm: PeerManager) =
|
|||
pm.peerStore.protoBook.set(peerId, storedInfo.protos)
|
||||
pm.peerStore.keyBook.set(peerId, storedInfo.publicKey)
|
||||
pm.peerStore.connectionBook.set(peerId, NotConnected) # Reset connectedness state
|
||||
pm.peerStore.disconnectBook.set(peerId, disconnectTime)
|
||||
|
||||
let res = pm.storage.getAll(onData)
|
||||
if res.isErr:
|
||||
|
@ -104,7 +107,7 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
|
|||
of ConnEventKind.Disconnected:
|
||||
pm.peerStore.connectionBook.set(peerId, CanConnect)
|
||||
if not pm.storage.isNil:
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect)
|
||||
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
|
||||
return
|
||||
|
||||
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
|
||||
|
@ -195,13 +198,31 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[PeerInfo] =
|
|||
else:
|
||||
return none(PeerInfo)
|
||||
|
||||
proc reconnectPeers*(pm: PeerManager, proto: string) {.async.} =
|
||||
proc reconnectPeers*(pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)) {.async.} =
|
||||
## Reconnect to peers registered for this protocol. This will update connectedness.
|
||||
## Especially useful to resume connections from persistent storage after a restart.
|
||||
|
||||
debug "Reconnecting peers", proto=proto
|
||||
|
||||
for storedInfo in pm.peers(proto):
|
||||
# Check if peer is reachable.
|
||||
if pm.peerStore.connectionBook.get(storedInfo.peerId) == CannotConnect:
|
||||
debug "Not reconnecting to unreachable peer", peerId=storedInfo.peerId
|
||||
continue
|
||||
|
||||
# Respect optional backoff period where applicable.
|
||||
let
|
||||
disconnectTime = Moment.init(pm.peerStore.disconnectBook.get(storedInfo.peerId), Second) # Convert
|
||||
currentTime = Moment.init(getTime().toUnix, Second) # Current time comparable to persisted value
|
||||
backoffTime = disconnectTime + backoff - currentTime # Consider time elapsed since last disconnect
|
||||
|
||||
trace "Respecting backoff", backoff=backoff, disconnectTime=disconnectTime, currentTime=currentTime, backoffTime=backoffTime
|
||||
|
||||
if backoffTime > ZeroDuration:
|
||||
debug "Backing off before reconnect...", peerId=storedInfo.peerId, backoffTime=backoffTime
|
||||
# We disconnected recently and still need to wait for a backoff period before connecting
|
||||
await sleepAsync(backoffTime)
|
||||
|
||||
trace "Reconnecting to peer", peerId=storedInfo.peerId
|
||||
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
||||
|
||||
|
@ -217,5 +238,9 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout =
|
|||
if not pm.hasPeer(peerInfo, proto):
|
||||
trace "Adding newly dialed peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
|
||||
pm.addPeer(peerInfo, proto)
|
||||
|
||||
if peerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||
# Do not attempt to dial self
|
||||
return none(Connection)
|
||||
|
||||
return await pm.dialPeer(peerInfo.peerId, peerInfo.addrs, proto, dialTimeout)
|
||||
|
|
|
@ -19,8 +19,11 @@ type
|
|||
|
||||
ConnectionBook* = object of PeerBook[Connectedness]
|
||||
|
||||
DisconnectBook* = object of PeerBook[int64] # Keeps track of when peers were disconnected in Unix timestamps
|
||||
|
||||
WakuPeerStore* = ref object of PeerStore
|
||||
connectionBook*: ConnectionBook
|
||||
disconnectBook*: DisconnectBook
|
||||
|
||||
proc new*(T: type WakuPeerStore): WakuPeerStore =
|
||||
var p: WakuPeerStore
|
||||
|
|
|
@ -12,12 +12,13 @@ type
|
|||
PeerStorageResult*[T] = Result[T, string]
|
||||
|
||||
DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
|
||||
connectedness: Connectedness) {.closure.}
|
||||
connectedness: Connectedness, disconnectTime: int64) {.closure.}
|
||||
|
||||
# PeerStorage interface
|
||||
method put*(db: PeerStorage,
|
||||
peerId: PeerID,
|
||||
storedInfo: StoredInfo,
|
||||
connectedness: Connectedness): PeerStorageResult[void] {.base.} = discard
|
||||
connectedness: Connectedness,
|
||||
disconnectTime: int64): PeerStorageResult[void] {.base.} = discard
|
||||
|
||||
method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
|
|
@ -56,16 +56,18 @@ proc encode*(storedInfo: StoredInfo): ProtoBuffer =
|
|||
##########################
|
||||
|
||||
proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] =
|
||||
## Create the "Peers" table
|
||||
## Create the "Peer" table
|
||||
## It contains:
|
||||
## - peer id as primary key, stored as a blob
|
||||
## - stored info (serialised protobuf), stored as a blob
|
||||
## - last known enumerated connectedness state, stored as an integer
|
||||
## - disconnect time in epoch seconds, if applicable
|
||||
let prepare = db.prepareStmt("""
|
||||
CREATE TABLE IF NOT EXISTS Peers (
|
||||
CREATE TABLE IF NOT EXISTS Peer (
|
||||
peerId BLOB PRIMARY KEY,
|
||||
storedInfo BLOB,
|
||||
connectedness INTEGER
|
||||
connectedness INTEGER,
|
||||
disconnectTime INTEGER
|
||||
) WITHOUT ROWID;
|
||||
""", NoParams, void)
|
||||
|
||||
|
@ -82,19 +84,20 @@ proc new*(T: type WakuPeerStorage, db: SqliteDatabase): PeerStorageResult[T] =
|
|||
method put*(db: WakuPeerStorage,
|
||||
peerId: PeerID,
|
||||
storedInfo: StoredInfo,
|
||||
connectedness: Connectedness): PeerStorageResult[void] =
|
||||
connectedness: Connectedness,
|
||||
disconnectTime: int64): PeerStorageResult[void] =
|
||||
|
||||
## Adds a peer to storage or replaces existing entry if it already exists
|
||||
let prepare = db.database.prepareStmt(
|
||||
"REPLACE INTO Peers (peerId, storedInfo, connectedness) VALUES (?, ?, ?);",
|
||||
(seq[byte], seq[byte], int32),
|
||||
"REPLACE INTO Peer (peerId, storedInfo, connectedness, disconnectTime) VALUES (?, ?, ?, ?);",
|
||||
(seq[byte], seq[byte], int32, int64),
|
||||
void
|
||||
)
|
||||
|
||||
if prepare.isErr:
|
||||
return err("failed to prepare")
|
||||
|
||||
let res = prepare.value.exec((peerId.data, storedInfo.encode().buffer, int32(ord(connectedness))))
|
||||
let res = prepare.value.exec((peerId.data, storedInfo.encode().buffer, int32(ord(connectedness)), disconnectTime))
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
@ -117,10 +120,12 @@ method getAll*(db: WakuPeerStorage, onData: peer_storage.DataProc): PeerStorageR
|
|||
storedInfo = StoredInfo.init(@(toOpenArray(sTo, 0, sToL - 1))).tryGet()
|
||||
# Connectedness
|
||||
connectedness = Connectedness(sqlite3_column_int(s, 2))
|
||||
# DisconnectTime
|
||||
disconnectTime = sqlite3_column_int64(s, 3)
|
||||
|
||||
onData(peerId, storedInfo, connectedness)
|
||||
onData(peerId, storedInfo, connectedness, disconnectTime)
|
||||
|
||||
let res = db.database.query("SELECT peerId, storedInfo, connectedness FROM Peers", peer)
|
||||
let res = db.database.query("SELECT peerId, storedInfo, connectedness, disconnectTime FROM Peer", peer)
|
||||
if res.isErr:
|
||||
return err("failed")
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import
|
|||
# NOTE For TopicHandler, solve with exports?
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/standard_setup,
|
||||
../protocol/[waku_relay, waku_message, message_notifier],
|
||||
../protocol/waku_store/waku_store,
|
||||
|
@ -419,31 +420,16 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
|
|||
|
||||
info "mounting relay"
|
||||
|
||||
node.subscribe(defaultTopic, none(TopicHandler))
|
||||
|
||||
for topic in topics:
|
||||
node.subscribe(topic, none(TopicHandler))
|
||||
|
||||
if node.peerManager.hasPeers(WakuRelayCodec):
|
||||
trace "Found previous WakuRelay peers. Reconnecting."
|
||||
# Reconnect to previous relay peers
|
||||
waitFor node.peerManager.reconnectPeers(WakuRelayCodec)
|
||||
|
||||
## GossipSub specifies a backoff period after disconnecting and unsubscribing before attempting
|
||||
## to re-graft peer on previous topics. We have to respect this period before starting WakuRelay.
|
||||
trace "Backing off before grafting after reconnecting to WakuRelay peers", backoff=wakuRelay.parameters.pruneBackoff
|
||||
|
||||
proc subscribeFuture() {.async.} =
|
||||
# Subscribe after the backoff period
|
||||
await sleepAsync(wakuRelay.parameters.pruneBackoff)
|
||||
|
||||
node.subscribe(defaultTopic, none(TopicHandler))
|
||||
|
||||
for topic in topics:
|
||||
node.subscribe(topic, none(TopicHandler))
|
||||
|
||||
discard subscribeFuture() # Dispatch future, but do not await.
|
||||
else:
|
||||
# Subscribe immediately
|
||||
node.subscribe(defaultTopic, none(TopicHandler))
|
||||
|
||||
for topic in topics:
|
||||
node.subscribe(topic, none(TopicHandler))
|
||||
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
|
||||
waitFor node.peerManager.reconnectPeers(WakuRelayCodec,
|
||||
wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime))
|
||||
|
||||
if rlnRelayEnabled:
|
||||
# TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc
|
||||
|
@ -595,7 +581,7 @@ when isMainModule:
|
|||
|
||||
var pStorage: WakuPeerStorage
|
||||
|
||||
if not sqliteDatabase.isNil:
|
||||
if conf.peerpersist and not sqliteDatabase.isNil:
|
||||
let res = WakuPeerStorage.new(sqliteDatabase)
|
||||
if res.isErr:
|
||||
warn "failed to init new WakuPeerStorage", err = res.error
|
||||
|
|
Loading…
Reference in New Issue