chore: filter remove all subscription from a peer that is leaving (#3267)

* waku/waku_filter_v2/protocol.nim keeps track of the filter-client connections in Table[PeerId, Connection]
* waku/waku_filter_v2/protocol.nim starts listening for peer-left events in order to completely remove the previous Connection instance. Also, a new Connection is added when the filter-service starts publishing to its peers.

---------
    
Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
This commit is contained in:
Ivan FB 2025-01-31 17:01:55 +01:00 committed by GitHub
parent ed2e26243f
commit 80291abc9a
4 changed files with 53 additions and 46 deletions

View File

@ -556,7 +556,7 @@ suite "Waku Filter - End to End":
)
discard await wakuFilter.subscriptions.addSubscription(
clientPeerId, filterCriteria.toHashSet(), peerManager
clientPeerId, filterCriteria.toHashSet()
)
let
@ -605,7 +605,6 @@ suite "Waku Filter - End to End":
await wakuFilter.subscriptions.addSubscription(
peers[index].switch.peerInfo.peerId,
@[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet(),
peerManager,
)
).isOkOr:
assert false, $error

View File

@ -953,6 +953,11 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
trace "Pruning Peer", Peer = $p
asyncSpawn(pm.switch.disconnect(p))
proc addExtPeerEventHandler*(
pm: PeerManager, eventHandler: PeerEventHandler, eventKind: PeerEventKind
) =
pm.switch.addPeerEventHandler(eventHandler, eventKind)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Initialization and Constructor #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#

View File

@ -28,6 +28,7 @@ type WakuFilter* = ref object of LPProtocol
messageCache: TimedCache[string]
peerRequestRateLimiter*: PerPeerRateLimiter
subscriptionsManagerFut: Future[void]
peerConnections: Table[PeerId, Connection]
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
debug "pinging subscriber", peerId = peerId
@ -69,7 +70,7 @@ proc subscribe(
debug "subscribing peer to filter criteria",
peerId = peerId, filterCriteria = filterCriteria
(await wf.subscriptions.addSubscription(peerId, filterCriteria, wf.peerManager)).isOkOr:
(await wf.subscriptions.addSubscription(peerId, filterCriteria)).isOkOr:
return err(FilterSubscribeError.serviceUnavailable(error))
debug "correct subscription", peerId = peerId
@ -166,24 +167,40 @@ proc handleSubscribeRequest*(
else:
return FilterSubscribeResponse.ok(request.requestId)
proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
debug "pushing message to subscribed peer", peerId = shortLog(peer)
proc pushToPeer(
wf: WakuFilter, peerId: PeerId, buffer: seq[byte]
): Future[Result[void, string]] {.async.} =
debug "pushing message to subscribed peer", peerId = shortLog(peerId)
if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec):
if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec):
# Check that peer has not been removed from peer store
error "no addresses for peer", peerId = shortLog(peer)
return
error "no addresses for peer", peerId = shortLog(peerId)
return err("no addresses for peer: " & $peerId)
let conn = wf.subscriptions.getConnectionByPeerId(peer).valueOr:
error "could not get connection by peer id", error = $error
return
let conn =
if wf.peerConnections.contains(peerId):
wf.peerConnections[peerId]
else:
## we never pushed a message before, let's dial then
let connRes = await wf.peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
error "pushToPeer no connection to peer", peerId = shortLog(peerId)
return err("pushToPeer no connection to peer: " & shortLog(peerId))
let newConn = connRes.get()
wf.peerConnections[peerId] = newConn
newConn
await conn.writeLp(buffer)
debug "published successful", peerId = shortLog(peer), conn
debug "published successful", peerId = shortLog(peerId), conn
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
)
return ok()
proc pushToPeers(
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
) {.async.} =
@ -208,11 +225,12 @@ proc pushToPeers(
msg_hash = msgHash
let bufferToPublish = messagePush.encode().buffer
var pushFuts: seq[Future[void]]
var pushFuts: seq[Future[Result[void, string]]]
for peerId in peers:
let pushFut = wf.pushToPeer(peerId, bufferToPublish)
pushFuts.add(pushFut)
await allFutures(pushFuts)
proc maintainSubscriptions*(wf: WakuFilter) {.async.} =
@ -324,6 +342,15 @@ proc initProtocolHandler(wf: WakuFilter) =
wf.handler = handler
wf.codec = WakuFilterSubscribeCodec
proc onPeerEventHandler(wf: WakuFilter, peerId: PeerId, event: PeerEvent) {.async.} =
## These events are dispatched nim-libp2p, triggerPeerEvents proc
case event.kind
of Left:
## Drop the previous known connection reference
wf.peerConnections.del(peerId)
else:
discard
proc new*(
T: type WakuFilter,
peerManager: PeerManager,
@ -342,6 +369,11 @@ proc new*(
peerRequestRateLimiter: PerPeerRateLimiter(setting: rateLimitSetting),
)
proc peerEventHandler(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} =
wf.onPeerEventHandler(peerId, event)
peerManager.addExtPeerEventHandler(peerEventHandler, PeerEventKind.Left)
wf.initProtocolHandler()
setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting)
return wf

View File

@ -31,7 +31,7 @@ type
SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint, connection: Connection]
PeerData* = tuple[lastSeen: Moment, criteriaCount: uint]
FilterSubscriptions* = ref object
peersSubscribed*: Table[PeerID, PeerData]
@ -46,7 +46,6 @@ proc new*(
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
): FilterSubscriptions =
## Create a new filter subscription object
return FilterSubscriptions(
peersSubscribed: initTable[PeerID, PeerData](),
subscriptions: initTable[FilterCriterion, SubscribedPeers](),
@ -103,10 +102,6 @@ proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} =
debug "removePeer",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
s.peersSubscribed.withValue(peerId, peerData):
debug "closing connection with peer", peerId = shortLog(peerId)
await peerData.connection.close()
s.peersSubscribed.del(peerId)
debug "removePeer after deletion",
@ -146,15 +141,10 @@ proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
data.lastSeen = Moment.now()
proc addSubscription*(
s: FilterSubscriptions,
peerId: PeerID,
filterCriteria: FilterCriteria,
peerManager: PeerManager,
s: FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria
): Future[Result[void, string]] {.async.} =
## Add a subscription for a given peer
##
## The peerManager is needed to establish the first Connection through
## /vac/waku/filter-push/2.0.0-beta1
var peerData: ptr PeerData
s.peersSubscribed.withValue(peerId, data):
@ -168,17 +158,7 @@ proc addSubscription*(
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))
let connRes = await peerManager.dialPeer(peerId, WakuFilterPushCodec)
if connRes.isNone():
## We do not remove this peer, but allow the underlying peer manager
## to do so if it is deemed necessary
return err("addSubscription no connection to peer: " & shortLog(peerId))
let newPeerData: PeerData =
(lastSeen: Moment.now(), criteriaCount: 0, connection: connRes.get())
debug "new WakuFilterPushCodec stream", conn = connRes.get()
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
for filterCriterion in filterCriteria:
@ -216,14 +196,5 @@ proc removeSubscription*(
do:
return err("Peer has no subscriptions")
proc getConnectionByPeerId*(
s: FilterSubscriptions, peerId: PeerID
): Result[Connection, string] =
if not s.peersSubscribed.hasKey(peerId):
return err("peer not subscribed: " & shortLog(peerId))
let peerData = s.peersSubscribed.getOrDefault(peerId)
return ok(peerData.connection)
proc setSubscriptionTimeout*(s: FilterSubscriptions, newTimeout: Duration) =
s.subscriptionTimeout = newTimeout