Add backoff period before subscribing to persistent peers (#491)

This commit is contained in:
Hanno Cornelius 2021-04-16 11:57:45 +02:00 committed by GitHub
parent 900d53f9df
commit 192d4dd66b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 6 deletions

View File

@ -76,6 +76,10 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
proc loadFromStorage(pm: PeerManager) = proc loadFromStorage(pm: PeerManager) =
# Load peers from storage, if available # Load peers from storage, if available
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) = proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness) =
if peerId == pm.switch.peerInfo.peerId:
# Do not manage self
return
pm.peerStore.addressBook.set(peerId, storedInfo.addrs) pm.peerStore.addressBook.set(peerId, storedInfo.addrs)
pm.peerStore.protoBook.set(peerId, storedInfo.protos) pm.peerStore.protoBook.set(peerId, storedInfo.protos)
pm.peerStore.keyBook.set(peerId, storedInfo.publicKey) pm.peerStore.keyBook.set(peerId, storedInfo.publicKey)
@ -149,9 +153,17 @@ proc hasPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): bool =
pm.peerStore.get(peerInfo.peerId).protos.contains(proto) pm.peerStore.get(peerInfo.peerId).protos.contains(proto)
proc hasPeers*(pm: PeerManager, proto: string): bool =
# Returns `true` if manager has any peers for the specified protocol
pm.peers.anyIt(it.protos.contains(proto))
proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) = proc addPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string) =
# Adds peer to manager for the specified protocol # Adds peer to manager for the specified protocol
if peerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to manage our unmanageable self
return
debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto debug "Adding peer to manager", peerId = peerInfo.peerId, addr = peerInfo.addrs[0], proto = proto
# ...known addresses # ...known addresses

View File

@ -419,10 +419,31 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
info "mounting relay" info "mounting relay"
node.subscribe(defaultTopic, none(TopicHandler)) if node.peerManager.hasPeers(WakuRelayCodec):
trace "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers
waitFor node.peerManager.reconnectPeers(WakuRelayCodec)
for topic in topics: ## GossipSub specifies a backoff period after disconnecting and unsubscribing before attempting
node.subscribe(topic, none(TopicHandler)) ## to re-graft peer on previous topics. We have to respect this period before starting WakuRelay.
trace "Backing off before grafting after reconnecting to WakuRelay peers", backoff=wakuRelay.parameters.pruneBackoff
proc subscribeFuture() {.async.} =
# Subscribe after the backoff period
await sleepAsync(wakuRelay.parameters.pruneBackoff)
node.subscribe(defaultTopic, none(TopicHandler))
for topic in topics:
node.subscribe(topic, none(TopicHandler))
discard subscribeFuture() # Dispatch future, but do not await.
else:
# Subscribe immediately
node.subscribe(defaultTopic, none(TopicHandler))
for topic in topics:
node.subscribe(topic, none(TopicHandler))
if rlnRelayEnabled: if rlnRelayEnabled:
# TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc # TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc
@ -439,9 +460,6 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
info "relay mounted and started successfully" info "relay mounted and started successfully"
# Reconnect to previous relay peers
waitFor node.peerManager.reconnectPeers(WakuRelayCodec)
## Helpers ## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} = proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address info "dialPeer", address = address

View File

@ -41,6 +41,11 @@ method initPubSub*(w: WakuRelay) =
w.verifySignature = false w.verifySignature = false
w.sign = false w.sign = false
# Here we can fine-tune GossipSub params for our purposes
w.parameters = GossipSubParams.init()
# Setting pruneBackoff allows us to restart nodes and trigger a re-subscribe within reasonable time.
w.parameters.pruneBackoff = 30.seconds
procCall GossipSub(w).initPubSub() procCall GossipSub(w).initPubSub()
w.init() w.init()