diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 0db00c2d4..067d32c7f 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -37,6 +37,17 @@ proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuS return proto +proc newTestWakuStore(switch: Switch, handler: HistoryQueryHandler): Future[WakuStore] {.async.} = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuStore.new(peerManager, rng, handler) + + await proto.start() + switch.mount(proto) + + return proto + proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient = let peerManager = PeerManager.new(switch) @@ -44,11 +55,99 @@ proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStor WakuStoreClient.new(peerManager, rng, store) +suite "Waku Store - query handler": + + asyncTest "history query handler should be called": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + + let msg = fakeWakuMessage(contentTopic=DefaultContentTopic) + + var queryHandlerFut = newFuture[(HistoryQuery)]() + let queryHandler = proc(req: HistoryQuery): HistoryResult = + queryHandlerFut.complete(req) + return ok(HistoryResponse(messages: @[msg])) + + let + server = await newTestWakuStore(serverSwitch, handler=queryhandler) + client = newTestWakuStoreClient(clientSwitch) + + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true) + + ## When + let queryRes = await client.query(req, peer=serverPeerInfo) + + ## Then + check: + not queryHandlerFut.failed() + queryRes.isOk() + + let request = queryHandlerFut.read() + check: + request == req + + let response = queryRes.tryGet() + check: + response.messages.len == 1 + response.messages == @[msg] + + ## Cleanup + await allFutures(serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "history query handler should be called and return an error": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + ## Given + let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + + var queryHandlerFut = newFuture[(HistoryQuery)]() + let queryHandler = proc(req: HistoryQuery): HistoryResult = + queryHandlerFut.complete(req) + return err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST)) + + let + server = await newTestWakuStore(serverSwitch, handler=queryhandler) + client = newTestWakuStoreClient(clientSwitch) + + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], ascending: true) + + ## When + let queryRes = await client.query(req, peer=serverPeerInfo) + + ## Then + check: + not queryHandlerFut.failed() + queryRes.isErr() + + let request = queryHandlerFut.read() + check: + request == req + + let error = queryRes.tryError() + check: + error.kind == HistoryErrorKind.BAD_REQUEST + + ## Cleanup + await allFutures(serverSwitch.stop(), clientSwitch.stop()) + + procSuite "Waku Store - history query": ## Fixtures let storeA = block: let store = newTestMessageStore() - + let msgList = @[ fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(0)), fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(1)), @@ -69,13 +168,13 @@ procSuite "Waku Store - history query": asyncTest "handle query": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) @@ -98,7 +197,7 @@ procSuite "Waku Store - history query": check: queryRes.isOk() - let response = queryRes.tryGet() + let response = queryRes.tryGet() check: response.messages.len == 1 response.messages == @[msg1] @@ -108,13 +207,13 @@ procSuite "Waku Store - history query": asyncTest "handle query with multiple content filters": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) @@ -132,7 +231,7 @@ procSuite "Waku Store - history query": server.handleMessage("foo", msg1) server.handleMessage("foo", msg2) server.handleMessage("foo", msg3) - + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When @@ -143,7 +242,7 @@ procSuite "Waku Store - history query": check: queryRes.isOk() - let response = queryRes.tryGet() + let response = queryRes.tryGet() check: response.messages.len() == 2 response.messages.anyIt(it == msg1) @@ -151,16 +250,16 @@ procSuite "Waku Store - history query": ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) - + asyncTest "handle query with pubsub topic filter": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) @@ -168,7 +267,7 @@ procSuite "Waku Store - history query": let pubsubTopic1 = "queried-topic" pubsubTopic2 = "non-queried-topic" - + let contentTopic1 = ContentTopic("1") contentTopic2 = ContentTopic("2") @@ -182,14 +281,14 @@ procSuite "Waku Store - history query": server.handleMessage(pubsubtopic1, msg1) server.handleMessage(pubsubtopic2, msg2) server.handleMessage(pubsubtopic2, msg3) - + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When - # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) + # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) let req = HistoryQuery( pubsubTopic: some(pubsubTopic1), - contentTopics: @[contentTopic1, contentTopic3] + contentTopics: @[contentTopic1, contentTopic3] ) let queryRes = await client.query(req, peer=serverPeerInfo) @@ -197,7 +296,7 @@ procSuite "Waku Store - history query": check: queryRes.isOk() - let response = queryRes.tryGet() + let response = queryRes.tryGet() check: response.messages.len() == 1 response.messages.anyIt(it == msg1) @@ -207,13 +306,13 @@ procSuite "Waku Store - history query": asyncTest "handle query with pubsub topic filter - no match": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) @@ -250,19 +349,19 @@ procSuite "Waku Store - history query": asyncTest "handle query with pubsub topic filter - match the entire stored messages": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) ## Given let pubsubTopic = "queried-topic" - + let msg1 = fakeWakuMessage(payload="TEST-1") msg2 = fakeWakuMessage(payload="TEST-2") @@ -271,9 +370,9 @@ procSuite "Waku Store - history query": server.handleMessage(pubsubTopic, msg1) server.handleMessage(pubsubTopic, msg2) server.handleMessage(pubsubTopic, msg3) - + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() - + ## When let req = HistoryQuery(pubsubTopic: some(pubsubTopic)) let res = await client.query(req, peer=serverPeerInfo) @@ -294,13 +393,13 @@ procSuite "Waku Store - history query": asyncTest "handle query with forward pagination": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) @@ -315,19 +414,19 @@ procSuite "Waku Store - history query": WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4), WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3), WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2), - WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1), WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime) ] for msg in msgList: require server.store.put(DefaultPubsubTopic, msg).isOk() - + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When var req = HistoryQuery( contentTopics: @[DefaultContentTopic], - pageSize: 2, + pageSize: 2, ascending: true ) var res = await client.query(req, peer=serverPeerInfo) @@ -342,11 +441,11 @@ procSuite "Waku Store - history query": require: totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever response.messages.len() == 2 - response.pageSize == 2 + response.pageSize == 2 response.ascending == true req.cursor = response.cursor - + # Continue querying res = await client.query(req, peer=serverPeerInfo) require res.isOk() @@ -364,13 +463,13 @@ procSuite "Waku Store - history query": asyncTest "handle query with backward pagination": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) @@ -385,7 +484,7 @@ procSuite "Waku Store - history query": WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4), WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3), WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2), - WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1), WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime) ] @@ -397,7 +496,7 @@ procSuite "Waku Store - history query": ## When var req = HistoryQuery( contentTopics: @[DefaultContentTopic], - pageSize: 2, + pageSize: 2, ascending: false ) var res = await client.query(req, peer=serverPeerInfo) @@ -412,11 +511,11 @@ procSuite "Waku Store - history query": require: totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever response.messages.len() == 2 - response.pageSize == 2 + response.pageSize == 2 response.ascending == false req.cursor = response.cursor - + # Continue querying res = await client.query(req, peer=serverPeerInfo) require res.isOk() @@ -434,13 +533,13 @@ procSuite "Waku Store - history query": asyncTest "handle query with no paging info - auto-pagination": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - - let + + let server = await newTestWakuStore(serverSwitch) client = newTestWakuStoreClient(clientSwitch) @@ -454,7 +553,7 @@ procSuite "Waku Store - history query": WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic), WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic), WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic), + WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic), WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2")) ] @@ -483,23 +582,23 @@ procSuite "Waku Store - history query": asyncTest "handle temporal history query with a valid time window": ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - let + let server = newTestWakuStore(serverSwitch, store=storeA) client = newTestWakuStoreClient(clientSwitch) ## Given let req = HistoryQuery( - contentTopics: @[ContentTopic("1")], - startTime: some(Timestamp(2)), + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(2)), endTime: some(Timestamp(5)) ) - + let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() ## When @@ -520,20 +619,20 @@ procSuite "Waku Store - history query": asyncTest "handle temporal history query with a zero-size time window": # a zero-size window results in an empty list of history messages ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - let + let server = await newTestWakuStore(serverSwitch, store=storeA) client = newTestWakuStoreClient(clientSwitch) ## Given let req = HistoryQuery( - contentTopics: @[ContentTopic("1")], - startTime: some(Timestamp(2)), + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(2)), endTime: some(Timestamp(2)) ) @@ -555,20 +654,20 @@ procSuite "Waku Store - history query": asyncTest "handle temporal history query with an invalid time window": # A history query with an invalid time range results in an empty list of history messages ## Setup - let + let serverSwitch = newTestSwitch() clientSwitch = newTestSwitch() - + await allFutures(serverSwitch.start(), clientSwitch.start()) - let + let server = await newTestWakuStore(serverSwitch, store=storeA) client = newTestWakuStoreClient(clientSwitch) ## Given let req = HistoryQuery( - contentTopics: @[ContentTopic("1")], - startTime: some(Timestamp(5)), + contentTopics: @[ContentTopic("1")], + startTime: some(Timestamp(5)), endTime: some(Timestamp(2)) ) @@ -599,10 +698,10 @@ suite "Message Store - message handling": ## Given let validSenderTime = now() let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime) - + ## When proto.handleMessage(DefaultPubSubTopic, message) - + ## Then check: store.getMessagesCount().tryGet() == 1 @@ -615,9 +714,9 @@ suite "Message Store - message handling": let store = StoreQueueRef.new(10) let switch = newTestSwitch() let proto = await newTestWakuStore(switch, store) - + ## Given - let msgList = @[ + let msgList = @[ fakeWakuMessage(ephemeral = false, payload = "1"), fakeWakuMessage(ephemeral = true, payload = "2"), fakeWakuMessage(ephemeral = true, payload = "3"), @@ -630,7 +729,7 @@ suite "Message Store - message handling": proto.handleMessage(DefaultPubsubTopic, msg) ## Then - check: + check: store.len == 2 ## Cleanup @@ -645,10 +744,10 @@ suite "Message Store - message handling": ## Given let invalidSenderTime = 0 let message = fakeWakuMessage(ts=invalidSenderTime) - + ## When proto.handleMessage(DefaultPubSubTopic, message) - + ## Then check: store.getMessagesCount().tryGet() == 1 @@ -666,19 +765,19 @@ suite "Message Store - message handling": let now = now() invalidSenderTime = now + MaxMessageTimestampVariance + 1 - + let message = fakeWakuMessage(ts=invalidSenderTime) - + ## When proto.handleMessage(DefaultPubSubTopic, message) - + ## Then check: store.getMessagesCount().tryGet() == 0 - + ## Cleanup await switch.stop() - + asyncTest "it should not store a message with a sender time variance greater than max time variance (past)": ## Setup let store = StoreQueueRef.new(5) @@ -689,12 +788,12 @@ suite "Message Store - message handling": let now = now() invalidSenderTime = now - MaxMessageTimestampVariance - 1 - + let message = fakeWakuMessage(ts=invalidSenderTime) - + ## When proto.handleMessage(DefaultPubSubTopic, message) - + ## Then check: store.getMessagesCount().tryGet() == 0 diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 4227fb3d7..f0410f9a6 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -37,10 +37,13 @@ const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" +type HistoryQueryHandler* = proc(req: HistoryQuery): HistoryResult {.gcsafe.} + type WakuStore* = ref object of LPProtocol - peerManager*: PeerManager - rng*: ref rand.HmacDrbgContext + peerManager: PeerManager + rng: ref rand.HmacDrbgContext + queryHandler: HistoryQueryHandler store*: MessageStore retentionPolicy: Option[MessageRetentionPolicy] @@ -226,40 +229,29 @@ proc initProtocolHandler(ws: WakuStore) = # TODO: Return (BAD_REQUEST, cause: "empty query") return + let + requestId = reqRpc.requestId + request = reqRpc.query.get().toAPI() - info "received history query", peerId=conn.peerId, requestId=reqRpc.requestId, query=reqRpc.query + info "received history query", peerId=conn.peerId, requestId=requestId, query=request waku_store_queries.inc() + let responseRes = ws.queryHandler(request) - if ws.store.isNil(): - let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE) + if responseRes.isErr(): + error "history query failed", peerId=conn.peerId, requestId=requestId, error=responseRes.error - error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr - - let resp = HistoryResponseRPC(error: respErr.toRPC()) - let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) + let response = responseRes.toRPC() + let rpc = HistoryRPC(requestId: requestId, response: some(response)) await conn.writeLp(rpc.encode().buffer) return - let query = reqRpc.query.get().toAPI() + let response = responseRes.toRPC() - let respRes = ws.findMessages(query) + info "sending history response", peerId=conn.peerId, requestId=requestId, messages=response.messages.len - if respRes.isErr(): - error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error=respRes.error - - let resp = respRes.toRPC() - let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) - await conn.writeLp(rpc.encode().buffer) - return - - - let resp = respRes.toRPC() - - info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len - - let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) + let rpc = HistoryRPC(requestId: requestId, response: some(response)) await conn.writeLp(rpc.encode().buffer) ws.handler = handler @@ -276,5 +268,23 @@ proc new*(T: type WakuStore, store: store, retentionPolicy: retentionPolicy ) + ws.queryHandler = proc(request: HistoryQuery): HistoryResult = ws.findMessages(request) + ws.initProtocolHandler() + ws + +proc new*(T: type WakuStore, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + queryHandler: HistoryQueryHandler): T = + + # Raise a defect if history query handler is nil + if queryHandler.isNil(): + raise newException(NilAccessDefect, "history query handler is nil") + + let ws = WakuStore( + rng: rng, + peerManager: peerManager, + queryHandler: queryHandler + ) ws.initProtocolHandler() ws