fix: duplicate message forwarding in filter service (#2842)

* fix: it's resolve duplicate message forwarding for filter service

* chore: update little flow

* fix: update implementation using timed cache method

* chore: simple format change

* chore: simple format change

* chore: update put function location

* chore: update according suggestion
This commit is contained in:
Darshan K 2024-06-26 01:05:03 +05:30 committed by GitHub
parent 01050138c6
commit 99149ea9dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 24 additions and 13 deletions

View File

@ -11,7 +11,8 @@ import
chronicles, chronicles,
chronos, chronos,
libp2p/peerid, libp2p/peerid,
libp2p/protocols/protocol libp2p/protocols/protocol,
libp2p/protocols/pubsub/timedcache
import import
../node/peer_manager, ../node/peer_manager,
../waku_core, ../waku_core,
@ -31,6 +32,7 @@ type WakuFilter* = ref object of LPProtocol
# a mapping of peer ids to a sequence of filter criteria # a mapping of peer ids to a sequence of filter criteria
peerManager: PeerManager peerManager: PeerManager
maintenanceTask: TimerCallback maintenanceTask: TimerCallback
messageCache: TimedCache[string]
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
trace "pinging subscriber", peerId = peerId trace "pinging subscriber", peerId = peerId
@ -176,20 +178,27 @@ proc pushToPeers(
let msgHash = let msgHash =
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
notice "pushing message to subscribed peers", ## it's also refresh expire of msghash, that's why update cache every time, even if it has a value.
pubsubTopic = messagePush.pubsubTopic, if wf.messageCache.put(msgHash, Moment.now()):
contentTopic = messagePush.wakuMessage.contentTopic, notice "duplicate message found, not-pushing message to subscribed peers",
target_peer_ids = targetPeerIds, pubsubTopic = messagePush.pubsubTopic,
msg_hash = msgHash contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
msg_hash = msgHash
else:
notice "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
msg_hash = msgHash
let bufferToPublish = messagePush.encode().buffer let bufferToPublish = messagePush.encode().buffer
var pushFuts: seq[Future[void]]
var pushFuts: seq[Future[void]] for peerId in peers:
for peerId in peers: let pushFut = wf.pushToPeer(peerId, bufferToPublish)
let pushFut = wf.pushToPeer(peerId, bufferToPublish) pushFuts.add(pushFut)
pushFuts.add(pushFut) await allFutures(pushFuts)
await allFutures(pushFuts)
proc maintainSubscriptions*(wf: WakuFilter) = proc maintainSubscriptions*(wf: WakuFilter) =
trace "maintaining subscriptions" trace "maintaining subscriptions"
@ -289,12 +298,14 @@ proc new*(
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = MaxFilterPeers, maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
timeout: Duration = 2.minutes,
): T = ): T =
let wf = WakuFilter( let wf = WakuFilter(
subscriptions: FilterSubscriptions.init( subscriptions: FilterSubscriptions.init(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
), ),
peerManager: peerManager, peerManager: peerManager,
messageCache: init(TimedCache[string], timeout),
) )
wf.initProtocolHandler() wf.initProtocolHandler()