chore: unit test for duplicate message push (#2852)

* chore: add unit test for testing duplicate message push with timedcache

* chore: update according to better naming convention

---------

Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
This commit is contained in:
Darshan K 2024-06-28 18:16:06 +05:30 committed by GitHub
parent 7ad9722ecf
commit d41280cc7a
4 changed files with 153 additions and 4 deletions

View File

@ -1,6 +1,8 @@
{.used.}
import
std/os,
chronos/timer,
stew/byteutils,
stew/shims/net,
testutils/unittests,
@ -52,7 +54,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start())
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountFilter()
await testSetup.serviceNode.mountFilter(messageCacheTTL = 1.seconds)
await testSetup.subscriberNode.mountFilterClient()
testSetup.subscriberNode.peerManager.addServicePeer(
@ -315,3 +317,147 @@ suite "Waku v2 Rest API - Filter V2":
messages == @[testMessage]
await restFilterTest.shutdown()
asyncTest "duplicate message push to filter subscriber":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
let requestBody = FilterSubscribeRequest(
requestId: "1001",
contentFilters: @[DefaultContentTopic],
pubsubTopic: some(DefaultPubsubTopic),
)
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)
# subscribe fiter service
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(
DefaultPubsubTopic, DefaultContentTopic
)
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "1001"
subscribedPeer.len() == 1
# ping subscriber node
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)
let pingResponse = await restFilterTest.client.filterSubscriberPing("1002")
check:
pingResponse.status == 200
pingResponse.data.requestId == "1002"
pingResponse.data.statusDesc == "OK"
# first - message push from service node to subscriber client
let testMessage = WakuMessage(
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
contentTopic: DefaultContentTopic,
timestamp: int64(2022),
meta: "test-meta".toBytes(),
)
let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)
# check messages received client side or not
let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
check:
postMsgResponse1.status == 200
$postMsgResponse1.contentType == $MIMETYPE_TEXT
postMsgResponse1.data == "OK"
len(messages1.data) == 1
# second - message push from service node to subscriber client
let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)
# check message received client side or not
let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
check:
postMsgResponse2.status == 200
$postMsgResponse2.contentType == $MIMETYPE_TEXT
postMsgResponse2.data == "OK"
len(messages2.data) == 0
await restFilterTest.shutdown()
asyncTest "duplicate message push to filter subscriber ( sleep in between )":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
let requestBody = FilterSubscribeRequest(
requestId: "1001",
contentFilters: @[DefaultContentTopic],
pubsubTopic: some(DefaultPubsubTopic),
)
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)
# subscribe fiter service
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(
DefaultPubsubTopic, DefaultContentTopic
)
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "1001"
subscribedPeer.len() == 1
# ping subscriber node
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)
let pingResponse = await restFilterTest.client.filterSubscriberPing("1002")
check:
pingResponse.status == 200
pingResponse.data.requestId == "1002"
pingResponse.data.statusDesc == "OK"
# first - message push from service node to subscriber client
let testMessage = WakuMessage(
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
contentTopic: DefaultContentTopic,
timestamp: int64(2022),
meta: "test-meta".toBytes(),
)
let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)
# check messages received client side or not
let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
check:
postMsgResponse1.status == 200
$postMsgResponse1.contentType == $MIMETYPE_TEXT
postMsgResponse1.data == "OK"
len(messages1.data) == 1
# Pause execution for 1 seconds to test TimeCache functionality of service node
await sleepAsync(1.seconds)
# second - message push from service node to subscriber client
let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)
# check message received client side or not
let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)
check:
postMsgResponse2.status == 200
$postMsgResponse2.contentType == $MIMETYPE_TEXT
postMsgResponse2.data == "OK"
len(messages2.data) == 1
await restFilterTest.shutdown()

View File

@ -434,12 +434,14 @@ proc mountFilter*(
filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL,
) {.async: (raises: []).} =
## Mounting filter v2 protocol
info "mounting filter protocol"
node.wakuFilter = WakuFilter.new(
node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer,
messageCacheTTL,
)
if node.started:

View File

@ -295,14 +295,14 @@ proc new*(
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
timeout: Duration = 2.minutes,
messageCacheTTL: Duration = MessageCacheTTL,
): T =
let wf = WakuFilter(
subscriptions: FilterSubscriptions.init(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
),
peerManager: peerManager,
messageCache: init(TimedCache[string], timeout),
messageCache: init(TimedCache[string], messageCacheTTL),
)
wf.initProtocolHandler()

View File

@ -10,6 +10,7 @@ const
MaxFilterPeers* = 1000
MaxFilterCriteriaPerPeer* = 1000
DefaultSubscriptionTimeToLiveSec* = 5.minutes
MessageCacheTTL* = 2.minutes
type
# a single filter criterion is fully defined by a pubsub topic and content topic