refactor(store): decouple waku store public api types from rpc types

This commit is contained in:
Lorenzo Delgado 2022-11-09 18:50:18 +01:00 committed by GitHub
parent 054dc61763
commit b07cdb1841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 831 additions and 479 deletions

View File

@ -484,7 +484,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
echo &"{chatLine}" echo &"{chatLine}"
info "Hit store handler" info "Hit store handler"
let queryRes = await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)])) let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic]))
if queryRes.isOk(): if queryRes.isOk():
storeHandler(queryRes.value) storeHandler(queryRes.value)

View File

@ -25,6 +25,7 @@ import
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_store/rpc,
../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_filter/client, ../../waku/v2/protocol/waku_filter/client,
@ -263,7 +264,7 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient() let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false) await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilter(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check: check:
response.messages.len() == 8 response.messages.len() == 8
response.pagingOptions.isNone() response.pagingOptions.isNone()

View File

@ -151,7 +151,14 @@ procSuite "Queue store - pagination":
test "Forward pagination - invalid cursor": test "Forward pagination - invalid cursor":
## Given ## Given
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic).toIndex() let msg = fakeWakuMessage(payload= @[byte 10])
let index = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg)
).toIndex()
let let
pageSize: uint64 = 10 pageSize: uint64 = 10
cursor: Option[Index] = some(index) cursor: Option[Index] = some(index)
@ -325,7 +332,14 @@ procSuite "Queue store - pagination":
test "Backward pagination - invalid cursor": test "Backward pagination - invalid cursor":
## Given ## Given
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic).toIndex() let msg = fakeWakuMessage(payload= @[byte 10])
let index = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg)
).toIndex()
let let
pageSize: uint64 = 2 pageSize: uint64 = 2
cursor: Option[Index] = some(index) cursor: Option[Index] = some(index)

View File

@ -10,7 +10,7 @@ import
../../waku/v2/node/message_store/message_retention_policy, ../../waku/v2/node/message_store/message_retention_policy,
../../waku/v2/node/message_store/message_retention_policy_capacity, ../../waku/v2/node/message_store/message_retention_policy_capacity,
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/pagination, ../../waku/v2/protocol/waku_store,
../../waku/v2/utils/time, ../../waku/v2/utils/time,
./utils, ./utils,
./testlib/common ./testlib/common
@ -137,12 +137,17 @@ suite "Message Store":
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3), WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3),
] ]
var indexes: seq[PagingIndex] = @[] var indexes: seq[HistoryCursor] = @[]
for msg in msgs: for msg in msgs:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let index = PagingIndex.compute(msg, msg.timestamp, DefaultPubsubTopic) let cursor = HistoryCursor(
indexes.add(index) pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg)
)
indexes.add(cursor)
## When ## When
let res = store.getAllMessages() let res = store.getAllMessages()
@ -167,9 +172,9 @@ suite "Message Store":
pubsubTopic == DefaultPubsubTopic pubsubTopic == DefaultPubsubTopic
# check correct retrieval of receiver timestamps # check correct retrieval of receiver timestamps
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true if receiverTimestamp == indexes[0].storeTime: rt1Flag = true
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true if receiverTimestamp == indexes[1].storeTime: rt2Flag = true
if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true if receiverTimestamp == indexes[2].storeTime: rt3Flag = true
check: check:
msg in msgs msg in msgs

View File

@ -8,7 +8,7 @@ import
../../waku/common/sqlite, ../../waku/common/sqlite,
../../waku/v2/node/message_store/sqlite_store, ../../waku/v2/node/message_store/sqlite_store,
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/pagination, ../../waku/v2/protocol/waku_store,
./utils, ./utils,
./testlib/common ./testlib/common
@ -231,7 +231,12 @@ suite "message store - history query":
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = PagingIndex.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages[4].timestamp,
storeTime: messages[4].timestamp,
digest: computeDigest(messages[4])
)
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -279,7 +284,12 @@ suite "message store - history query":
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = PagingIndex.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages[6].timestamp,
storeTime: messages[6].timestamp,
digest: computeDigest(messages[6])
)
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -330,7 +340,12 @@ suite "message store - history query":
for msg in messages2: for msg in messages2:
require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = PagingIndex.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages2[0].timestamp,
storeTime: messages2[0].timestamp,
digest: computeDigest(messages2[0])
)
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(
@ -597,7 +612,12 @@ suite "message store - history query":
for msg in messages: for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = PagingIndex.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) let cursor = HistoryCursor(
pubsubTopic: DefaultPubsubTopic,
senderTime: messages[3].timestamp,
storeTime: messages[3].timestamp,
digest: computeDigest(messages[3])
)
## When ## When
let res = store.getMessagesByHistoryQuery( let res = store.getMessagesByHistoryQuery(

View File

@ -3,7 +3,8 @@
import import
std/[options, sequtils], std/[options, sequtils],
testutils/unittests, testutils/unittests,
chronos, chronos,
chronicles,
libp2p/crypto/crypto libp2p/crypto/crypto
import import
../../waku/common/sqlite, ../../waku/common/sqlite,
@ -90,14 +91,14 @@ procSuite "Waku Store - history query":
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) let req = HistoryQuery(contentTopics: @[topic])
let resQuery = await client.query(rpc, peer=serverPeerInfo) let queryRes = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check: check:
resQuery.isOk() queryRes.isOk()
let response = resQuery.tryGet() let response = queryRes.tryGet()
check: check:
response.messages.len == 1 response.messages.len == 1
response.messages == @[msg1] response.messages == @[msg1]
@ -135,17 +136,14 @@ procSuite "Waku Store - history query":
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let rpc = HistoryQuery(contentFilters: @[ let req = HistoryQuery(contentTopics: @[topic1, topic3])
HistoryContentFilter(contentTopic: topic1), let queryRes = await client.query(req, peer=serverPeerInfo)
HistoryContentFilter(contentTopic: topic3)
])
let resQuery = await client.query(rpc, peer=serverPeerInfo)
## Then ## Then
check: check:
resQuery.isOk() queryRes.isOk()
let response = resQuery.tryGet() let response = queryRes.tryGet()
check: check:
response.messages.len() == 2 response.messages.len() == 2
response.messages.anyIt(it == msg1) response.messages.anyIt(it == msg1)
@ -189,18 +187,17 @@ procSuite "Waku Store - history query":
## When ## When
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
let rpc = HistoryQuery( let req = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), pubsubTopic: some(pubsubTopic1),
HistoryContentFilter(contentTopic: contentTopic3)], contentTopics: @[contentTopic1, contentTopic3]
pubsubTopic: pubsubTopic1
) )
let resQuery = await client.query(rpc, peer=serverPeerInfo) let queryRes = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check: check:
resQuery.isOk() queryRes.isOk()
let response = resQuery.tryGet() let response = queryRes.tryGet()
check: check:
response.messages.len() == 1 response.messages.len() == 1
response.messages.anyIt(it == msg1) response.messages.anyIt(it == msg1)
@ -237,8 +234,8 @@ procSuite "Waku Store - history query":
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let rpc = HistoryQuery(pubsubTopic: pubsubTopic1) let req = HistoryQuery(pubsubTopic: some(pubsubTopic1))
let res = await client.query(rpc, peer=serverPeerInfo) let res = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check: check:
@ -278,8 +275,8 @@ procSuite "Waku Store - history query":
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let rpc = HistoryQuery(pubsubTopic: pubsubTopic) let req = HistoryQuery(pubsubTopic: some(pubsubTopic))
let res = await client.query(rpc, peer=serverPeerInfo) let res = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check: check:
@ -328,11 +325,12 @@ procSuite "Waku Store - history query":
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
var rpc = HistoryQuery( var req = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], contentTopics: @[DefaultContentTopic],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) pageSize: 2,
ascending: true
) )
var res = await client.query(rpc, peer=serverPeerInfo) var res = await client.query(req, peer=serverPeerInfo)
require res.isOk() require res.isOk()
var var
@ -340,17 +338,17 @@ procSuite "Waku Store - history query":
totalMessages = response.messages.len() totalMessages = response.messages.len()
totalQueries = 1 totalQueries = 1
while response.pagingInfo.cursor != PagingIndex(): while response.cursor.isSome():
require: require:
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2 response.messages.len() == 2
response.pagingInfo.pageSize == 2 response.pageSize == 2
response.pagingInfo.direction == PagingDirection.FORWARD response.ascending == true
rpc.pagingInfo = response.pagingInfo req.cursor = response.cursor
# Continue querying # Continue querying
res = await client.query(rpc, peer=serverPeerInfo) res = await client.query(req, peer=serverPeerInfo)
require res.isOk() require res.isOk()
response = res.tryGet() response = res.tryGet()
totalMessages += response.messages.len() totalMessages += response.messages.len()
@ -397,11 +395,12 @@ procSuite "Waku Store - history query":
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
var rpc = HistoryQuery( var req = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)], contentTopics: @[DefaultContentTopic],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) pageSize: 2,
ascending: false
) )
var res = await client.query(rpc, peer=serverPeerInfo) var res = await client.query(req, peer=serverPeerInfo)
require res.isOk() require res.isOk()
var var
@ -409,17 +408,17 @@ procSuite "Waku Store - history query":
totalMessages = response.messages.len() totalMessages = response.messages.len()
totalQueries = 1 totalQueries = 1
while response.pagingInfo.cursor != PagingIndex(): while response.cursor.isSome():
require: require:
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
response.messages.len() == 2 response.messages.len() == 2
response.pagingInfo.pageSize == 2 response.pageSize == 2
response.pagingInfo.direction == PagingDirection.BACKWARD response.ascending == false
rpc.pagingInfo = response.pagingInfo req.cursor = response.cursor
# Continue querying # Continue querying
res = await client.query(rpc, peer=serverPeerInfo) res = await client.query(req, peer=serverPeerInfo)
require res.isOk() require res.isOk()
response = res.tryGet() response = res.tryGet()
totalMessages += response.messages.len() totalMessages += response.messages.len()
@ -465,8 +464,8 @@ procSuite "Waku Store - history query":
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let res = await client.query(rpc, peer=serverPeerInfo) let res = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check: check:
@ -477,7 +476,7 @@ procSuite "Waku Store - history query":
## No pagination specified. Response will be auto-paginated with ## No pagination specified. Response will be auto-paginated with
## up to MaxPageSize messages per page. ## up to MaxPageSize messages per page.
response.messages.len() == 8 response.messages.len() == 8
response.pagingInfo == PagingInfo() response.cursor.isNone()
## Cleanup ## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop()) await allFutures(clientSwitch.stop(), serverSwitch.stop())
@ -495,16 +494,16 @@ procSuite "Waku Store - history query":
client = newTestWakuStoreClient(clientSwitch) client = newTestWakuStoreClient(clientSwitch)
## Given ## Given
let rpc = HistoryQuery( let req = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], contentTopics: @[ContentTopic("1")],
startTime: Timestamp(2), startTime: some(Timestamp(2)),
endTime: Timestamp(5) endTime: some(Timestamp(5))
) )
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let res = await client.query(rpc, peer=serverPeerInfo) let res = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check res.isOk() check res.isOk()
@ -532,16 +531,16 @@ procSuite "Waku Store - history query":
client = newTestWakuStoreClient(clientSwitch) client = newTestWakuStoreClient(clientSwitch)
## Given ## Given
let rpc = HistoryQuery( let req = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], contentTopics: @[ContentTopic("1")],
startTime: Timestamp(2), startTime: some(Timestamp(2)),
endTime: Timestamp(2) endTime: some(Timestamp(2))
) )
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let res = await client.query(rpc, peer=serverPeerInfo) let res = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check res.isOk() check res.isOk()
@ -567,16 +566,16 @@ procSuite "Waku Store - history query":
client = newTestWakuStoreClient(clientSwitch) client = newTestWakuStoreClient(clientSwitch)
## Given ## Given
let rpc = HistoryQuery( let req = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], contentTopics: @[ContentTopic("1")],
startTime: Timestamp(5), startTime: some(Timestamp(5)),
endTime: Timestamp(2) endTime: some(Timestamp(2))
) )
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo() let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
## When ## When
let res = await client.query(rpc, peer=serverPeerInfo) let res = await client.query(req, peer=serverPeerInfo)
## Then ## Then
check res.isOk() check res.isOk()

View File

@ -11,7 +11,7 @@ import
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_store/client, ../../waku/v2/protocol/waku_store/client {.all.},
./testlib/common, ./testlib/common,
./testlib/switch ./testlib/switch
@ -81,13 +81,10 @@ procSuite "Waku Store Client":
## Given ## Given
let peer = serverSwitch.peerInfo.toRemotePeerInfo() let peer = serverSwitch.peerInfo.toRemotePeerInfo()
let rpc = HistoryQuery( let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 8)
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 8)
)
## When ## When
let res = await client.query(rpc, peer) let res = await client.query(req, peer)
## Then ## Then
check: check:
@ -98,7 +95,7 @@ procSuite "Waku Store Client":
## No pagination specified. Response will be auto-paginated with ## No pagination specified. Response will be auto-paginated with
## up to MaxPageSize messages per page. ## up to MaxPageSize messages per page.
response.messages.len() == 8 response.messages.len() == 8
response.pagingInfo != PagingInfo() response.cursor.isSome()
## Cleanup ## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop()) await allFutures(clientSwitch.stop(), serverSwitch.stop())
@ -117,13 +114,10 @@ procSuite "Waku Store Client":
## Given ## Given
let peer = serverSwitch.peerInfo.toRemotePeerInfo() let peer = serverSwitch.peerInfo.toRemotePeerInfo()
let rpc = HistoryQuery( let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5)
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 5)
)
## When ## When
let res = await client.queryWithPaging(rpc, peer) let res = await client.queryAll(req, peer)
## Then ## Then
check: check:
@ -136,6 +130,8 @@ procSuite "Waku Store Client":
## Cleanup ## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop()) await allFutures(clientSwitch.stop(), serverSwitch.stop())
# TODO: Move to resume test suite
asyncTest "multiple query to multiple peers with pagination": asyncTest "multiple query to multiple peers with pagination":
## Setup ## Setup
let let
@ -155,13 +151,10 @@ procSuite "Waku Store Client":
serverSwitchA.peerInfo.toRemotePeerInfo(), serverSwitchA.peerInfo.toRemotePeerInfo(),
serverSwitchB.peerInfo.toRemotePeerInfo() serverSwitchB.peerInfo.toRemotePeerInfo()
] ]
let rpc = HistoryQuery( let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 5)
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 5)
)
## When ## When
let res = await client.queryLoop(rpc, peers) let res = await client.queryLoop(req, peers)
## Then ## Then
check: check:

View File

@ -5,20 +5,21 @@ import
testutils/unittests, testutils/unittests,
chronos chronos
import import
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store/rpc,
../../waku/v2/protocol/waku_store/rpc_codec,
../../waku/v2/utils/time, ../../waku/v2/utils/time,
./testlib/common ./testlib/common
procSuite "Waku Store - RPC codec": procSuite "Waku Store - RPC codec":
test "PagingIndex protobuf codec": test "PagingIndexRPC protobuf codec":
## Given ## Given
let index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) let index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
## When ## When
let encodedIndex = index.encode() let encodedIndex = index.encode()
let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer) let decodedIndexRes = PagingIndexRPC.decode(encodedIndex.buffer)
## Then ## Then
check: check:
@ -29,12 +30,12 @@ procSuite "Waku Store - RPC codec":
# The fields of decodedIndex must be the same as the original index # The fields of decodedIndex must be the same as the original index
decodedIndex == index decodedIndex == index
test "PagingIndex protobuf codec - empty index": test "PagingIndexRPC protobuf codec - empty index":
## Given ## Given
let emptyIndex = PagingIndex() let emptyIndex = PagingIndexRPC()
let encodedIndex = emptyIndex.encode() let encodedIndex = emptyIndex.encode()
let decodedIndexRes = PagingIndex.decode(encodedIndex.buffer) let decodedIndexRes = PagingIndexRPC.decode(encodedIndex.buffer)
## Then ## Then
check: check:
@ -42,18 +43,18 @@ procSuite "Waku Store - RPC codec":
let decodedIndex = decodedIndexRes.tryGet() let decodedIndex = decodedIndexRes.tryGet()
check: check:
# Check the correctness of init and encode for an empty PagingIndex # Check the correctness of init and encode for an empty PagingIndexRPC
decodedIndex == emptyIndex decodedIndex == emptyIndex
test "PagingInfo protobuf codec": test "PagingInfoRPC protobuf codec":
## Given ## Given
let let
index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD) pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.FORWARD)
## When ## When
let pb = pagingInfo.encode() let pb = pagingInfo.encode()
let decodedPagingInfo = PagingInfo.decode(pb.buffer) let decodedPagingInfo = PagingInfoRPC.decode(pb.buffer)
## Then ## Then
check: check:
@ -64,32 +65,32 @@ procSuite "Waku Store - RPC codec":
decodedPagingInfo.value == pagingInfo decodedPagingInfo.value == pagingInfo
decodedPagingInfo.value.direction == pagingInfo.direction decodedPagingInfo.value.direction == pagingInfo.direction
test "PagingInfo protobuf codec - empty paging info": test "PagingInfoRPC protobuf codec - empty paging info":
## Given ## Given
let emptyPagingInfo = PagingInfo() let emptyPagingInfo = PagingInfoRPC()
## When ## When
let pb = emptyPagingInfo.encode() let pb = emptyPagingInfo.encode()
let decodedEmptyPagingInfo = PagingInfo.decode(pb.buffer) let decodedEmptyPagingInfo = PagingInfoRPC.decode(pb.buffer)
## Then ## Then
check: check:
decodedEmptyPagingInfo.isOk() decodedEmptyPagingInfo.isOk()
check: check:
# check the correctness of init and encode for an empty PagingInfo # check the correctness of init and encode for an empty PagingInfoRPC
decodedEmptyPagingInfo.value == emptyPagingInfo decodedEmptyPagingInfo.value == emptyPagingInfo
test "HistoryQuery protobuf codec": test "HistoryQueryRPC protobuf codec":
## Given ## Given
let let
index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic), HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11)) query = HistoryQueryRPC(contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
## When ## When
let pb = query.encode() let pb = query.encode()
let decodedQuery = HistoryQuery.decode(pb.buffer) let decodedQuery = HistoryQueryRPC.decode(pb.buffer)
## Then ## Then
check: check:
@ -99,33 +100,33 @@ procSuite "Waku Store - RPC codec":
# the fields of decoded query decodedQuery must be the same as the original query query # the fields of decoded query decodedQuery must be the same as the original query query
decodedQuery.value == query decodedQuery.value == query
test "HistoryQuery protobuf codec - empty history query": test "HistoryQueryRPC protobuf codec - empty history query":
## Given ## Given
let emptyQuery = HistoryQuery() let emptyQuery = HistoryQueryRPC()
## When ## When
let pb = emptyQuery.encode() let pb = emptyQuery.encode()
let decodedEmptyQuery = HistoryQuery.decode(pb.buffer) let decodedEmptyQuery = HistoryQueryRPC.decode(pb.buffer)
## Then ## Then
check: check:
decodedEmptyQuery.isOk() decodedEmptyQuery.isOk()
check: check:
# check the correctness of init and encode for an empty HistoryQuery # check the correctness of init and encode for an empty HistoryQueryRPC
decodedEmptyQuery.value == emptyQuery decodedEmptyQuery.value == emptyQuery
test "HistoryResponse protobuf codec": test "HistoryResponseRPC protobuf codec":
## Given ## Given
let let
message = fakeWakuMessage() message = fakeWakuMessage()
index = PagingIndex.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic) index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD) pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD)
res = HistoryResponse(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR) res = HistoryResponseRPC(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseErrorRPC.INVALID_CURSOR)
## When ## When
let pb = res.encode() let pb = res.encode()
let decodedRes = HistoryResponse.decode(pb.buffer) let decodedRes = HistoryResponseRPC.decode(pb.buffer)
## Then ## Then
check: check:
@ -135,18 +136,18 @@ procSuite "Waku Store - RPC codec":
# the fields of decoded response decodedRes must be the same as the original response res # the fields of decoded response decodedRes must be the same as the original response res
decodedRes.value == res decodedRes.value == res
test "HistoryResponse protobuf codec - empty history response": test "HistoryResponseRPC protobuf codec - empty history response":
## Given ## Given
let emptyRes = HistoryResponse() let emptyRes = HistoryResponseRPC()
## When ## When
let pb = emptyRes.encode() let pb = emptyRes.encode()
let decodedEmptyRes = HistoryResponse.decode(pb.buffer) let decodedEmptyRes = HistoryResponseRPC.decode(pb.buffer)
## Then ## Then
check: check:
decodedEmptyRes.isOk() decodedEmptyRes.isOk()
check: check:
# check the correctness of init and encode for an empty HistoryResponse # check the correctness of init and encode for an empty HistoryResponseRPC
decodedEmptyRes.value == emptyRes decodedEmptyRes.value == emptyRes

View File

@ -81,10 +81,10 @@ procSuite "Waku SWAP Accounting":
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo() let serverPeer = server.peerInfo.toRemotePeerInfo()
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
## When ## When
let queryRes = await client.query(rpc, peer=serverPeer) let queryRes = await client.query(req, peer=serverPeer)
## Then ## Then
check queryRes.isOk() check queryRes.isOk()
@ -133,12 +133,12 @@ procSuite "Waku SWAP Accounting":
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo() let serverPeer = server.peerInfo.toRemotePeerInfo()
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
## When ## When
# TODO: Handshakes - for now we assume implicit, e2e still works for PoC # TODO: Handshakes - for now we assume implicit, e2e still works for PoC
let res1 = await client.query(rpc, peer=serverPeer) let res1 = await client.query(req, peer=serverPeer)
let res2 = await client.query(rpc, peer=serverPeer) let res2 = await client.query(req, peer=serverPeer)
require: require:
res1.isOk() res1.isOk()

View File

@ -54,7 +54,7 @@ procSuite "WakuNode - Store":
let serverPeer = server.peerInfo.toRemotePeerInfo() let serverPeer = server.peerInfo.toRemotePeerInfo()
## When ## When
let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let queryRes = await client.query(req, peer=serverPeer) let queryRes = await client.query(req, peer=serverPeer)
## Then ## Then
@ -106,7 +106,7 @@ procSuite "WakuNode - Store":
# Wait for the server filter to receive the push message # Wait for the server filter to receive the push message
require await filterFut.withTimeout(5.seconds) require await filterFut.withTimeout(5.seconds)
let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), peer=serverPeer) let res = await client.query(HistoryQuery(contentTopics: @[DefaultContentTopic]), peer=serverPeer)
## Then ## Then
check res.isOk() check res.isOk()

View File

@ -10,7 +10,7 @@ import
../../waku/v2/node/waku_payload, ../../waku/v2/node/waku_payload,
../../waku/v2/node/jsonrpc/jsonrpc_types, ../../waku/v2/node/jsonrpc/jsonrpc_types,
../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store/rpc,
../../waku/v1/node/rpc/hexstrings ../../waku/v1/node/rpc/hexstrings
@ -39,5 +39,5 @@ echo "Content topic is:", input
var node = newRpcHttpClient() var node = newRpcHttpClient()
waitfor node.connect("localhost", rpcPort) waitfor node.connect("localhost", rpcPort)
var res = waitfor node.get_waku_v2_store_v1_messages(some(pubsubTopic), some(@[HistoryContentFilter(contentTopic: ContentTopic(input))]), none(StorePagingOptions)) var res = waitfor node.get_waku_v2_store_v1_messages(some(pubsubTopic), some(@[HistoryContentFilterRPC(contentTopic: ContentTopic(input))]), none(StorePagingOptions))
echo "Waku query response: ", res echo "Waku query response: ", res

View File

@ -7,7 +7,7 @@ import
libp2p/protobuf/minprotobuf libp2p/protobuf/minprotobuf
import import
../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_store/rpc,
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time, ../../waku/v2/utils/time,
../../waku/v2/node/waku_node, ../../waku/v2/node/waku_node,

View File

@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool
# Store API # Store API
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse
# Filter API # Filter API

View File

@ -7,7 +7,7 @@ import
std/[options,tables], std/[options,tables],
eth/keys, eth/keys,
../../protocol/waku_message, ../../protocol/waku_message,
../../protocol/waku_store/pagination, ../../protocol/waku_store/rpc,
../../utils/time ../../utils/time
type type
@ -18,7 +18,7 @@ type
StorePagingOptions* = object StorePagingOptions* = object
## This type holds some options for pagination ## This type holds some options for pagination
pageSize*: uint64 pageSize*: uint64
cursor*: Option[PagingIndex] cursor*: Option[PagingIndexRPC]
forward*: bool forward*: bool
WakuRelayMessage* = object WakuRelayMessage* = object

View File

@ -9,6 +9,7 @@ import
../../../v1/node/rpc/hexstrings, ../../../v1/node/rpc/hexstrings,
../../protocol/waku_message, ../../protocol/waku_message,
../../protocol/waku_store, ../../protocol/waku_store,
../../protocol/waku_store/rpc,
../../utils/time, ../../utils/time,
../waku_payload, ../waku_payload,
./jsonrpc_types ./jsonrpc_types
@ -29,19 +30,26 @@ proc `%`*(value: WakuMessage): JsonNode =
## Since the Waku v2 JSON-RPC API has its own defined types, ## Since the Waku v2 JSON-RPC API has its own defined types,
## we need to convert between these and the types for the Nim API ## we need to convert between these and the types for the Nim API
proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfo = proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC =
PagingInfo(pageSize: pagingOptions.pageSize, PagingInfoRPC(pageSize: pagingOptions.pageSize,
cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndex(), cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndexRPC(),
direction: if pagingOptions.forward: PagingDirection.FORWARD else: PagingDirection.BACKWARD) direction: if pagingOptions.forward: PagingDirectionRPC.FORWARD else: PagingDirectionRPC.BACKWARD)
proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions = proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
StorePagingOptions(pageSize: pagingInfo.pageSize, StorePagingOptions(pageSize: pagingInfo.pageSize,
cursor: some(pagingInfo.cursor), cursor: some(pagingInfo.cursor),
forward: if pagingInfo.direction == PagingDirection.FORWARD: true else: false) forward: if pagingInfo.direction == PagingDirectionRPC.FORWARD: true else: false)
proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
StoreResponse(messages: historyResponse.messages, StoreResponse(
pagingOptions: if historyResponse.pagingInfo != PagingInfo(): some(historyResponse.pagingInfo.toPagingOptions()) else: none(StorePagingOptions)) messages: response.messages,
pagingOptions: if response.cursor.isNone(): none(StorePagingOptions)
else: some(StorePagingOptions(
pageSize: response.pageSize,
forward: response.ascending,
cursor: response.cursor.map(toRPC)
))
)
proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage =
var t: Timestamp var t: Timestamp

View File

@ -4,14 +4,15 @@ else:
{.push raises: [].} {.push raises: [].}
import import
std/options, std/[options, sequtils],
chronicles, chronicles,
json_rpc/rpcserver json_rpc/rpcserver
import import
../peer_manager/peer_manager,
../waku_node,
../../protocol/waku_store, ../../protocol/waku_store,
../../protocol/waku_store/rpc,
../../utils/time, ../../utils/time,
../waku_node,
../peer_manager/peer_manager,
./jsonrpc_types, ./jsonrpc_types,
./jsonrpc_utils ./jsonrpc_utils
@ -25,7 +26,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Store API version 1 definitions ## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilterRPC]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging ## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages" debug "get_waku_v2_store_v1_messages"
@ -33,12 +34,21 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
if peerOpt.isNone(): if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers") raise newException(ValueError, "no suitable remote store peers")
let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", let req = HistoryQuery(
contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], pubsubTopic: pubsubTopicOption,
startTime: if startTime.isSome: startTime.get() else: Timestamp(0), contentTopics: if contentFiltersOption.isNone(): @[]
endTime: if endTime.isSome: endTime.get() else: Timestamp(0), else: contentFiltersOption.get().mapIt(it.contentTopic),
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) startTime: startTime,
let queryFut = node.query(historyQuery, peerOpt.get()) endTime: endTime,
ascending: if pagingOptions.isNone(): true
else: pagingOptions.get().forward,
pageSize: if pagingOptions.isNone(): DefaultPageSize
else: min(pagingOptions.get().pageSize, MaxPageSize),
cursor: if pagingOptions.isNone(): none(HistoryCursor)
else: pagingOptions.get().cursor.map(toAPI)
)
let queryFut = node.query(req, peerOpt.get())
if not await queryFut.withTimeout(futTimeout): if not await queryFut.withTimeout(futTimeout):
raise newException(ValueError, "No history response received (timeout)") raise newException(ValueError, "No history response received (timeout)")
@ -48,4 +58,4 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
raise newException(ValueError, $res.error) raise newException(ValueError, $res.error)
debug "get_waku_v2_store_v1_messages response" debug "get_waku_v2_store_v1_messages response"
return res.value.toStoreResponse() return res.value.toJsonRPCStoreResponse()

View File

@ -8,7 +8,7 @@ import
nimcrypto/sha2 nimcrypto/sha2
import import
../../../protocol/waku_message, ../../../protocol/waku_message,
../../../protocol/waku_store/pagination, ../../../protocol/waku_store/common,
../../../utils/time ../../../utils/time
@ -33,19 +33,19 @@ proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTo
) )
proc toPagingIndex*(index: Index): PagingIndex = proc tohistoryCursor*(index: Index): HistoryCursor =
PagingIndex( HistoryCursor(
pubsubTopic: index.pubsubTopic, pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime, senderTime: index.senderTime,
receiverTime: index.receiverTime, storeTime: index.receiverTime,
digest: index.digest digest: index.digest
) )
proc toIndex*(index: PagingIndex): Index = proc toIndex*(index: HistoryCursor): Index =
Index( Index(
pubsubTopic: index.pubsubTopic, pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime, senderTime: index.senderTime,
receiverTime: index.receiverTime, receiverTime: index.storeTime,
digest: index.digest digest: index.digest
) )

View File

@ -10,7 +10,7 @@ import
chronicles chronicles
import import
../../../protocol/waku_message, ../../../protocol/waku_message,
../../../protocol/waku_store/pagination, ../../../protocol/waku_store/common,
../../../protocol/waku_store/message_store, ../../../protocol/waku_store/message_store,
../../../utils/time, ../../../utils/time,
./index ./index
@ -271,8 +271,8 @@ method getAllMessages*(store: StoreQueueRef): MessageStoreResult[seq[MessageStor
method getMessagesByHistoryQuery*( method getMessagesByHistoryQuery*(
store: StoreQueueRef, store: StoreQueueRef,
contentTopic = none(seq[ContentTopic]), contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string), pubsubTopic = none(PubsubTopic),
cursor = none(PagingIndex), cursor = none(HistoryCursor),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = DefaultPageSize, maxPageSize = DefaultPageSize,

View File

@ -12,7 +12,7 @@ import
import import
../../../../common/sqlite, ../../../../common/sqlite,
../../../protocol/waku_message, ../../../protocol/waku_message,
../../../protocol/waku_store/pagination, ../../../protocol/waku_store/common,
../../../protocol/waku_store/message_store, ../../../protocol/waku_store/message_store,
../../../utils/time, ../../../utils/time,
./queries ./queries
@ -99,13 +99,13 @@ method getMessagesByHistoryQuery*(
s: SqliteStore, s: SqliteStore,
contentTopic = none(seq[ContentTopic]), contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(PubsubTopic), pubsubTopic = none(PubsubTopic),
cursor = none(PagingIndex), cursor = none(HistoryCursor),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = DefaultPageSize, maxPageSize = DefaultPageSize,
ascendingOrder = true ascendingOrder = true
): MessageStoreResult[seq[MessageStoreRow]] = ): MessageStoreResult[seq[MessageStoreRow]] =
let cursor = cursor.map(proc(c: PagingIndex): DbCursor = (c.receiverTime, @(c.digest.data), c.pubsubTopic)) let cursor = cursor.map(proc(c: HistoryCursor): DbCursor = (c.storeTime, @(c.digest.data), c.pubsubTopic))
return s.db.selectMessagesByHistoryQueryWithLimit( return s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic, contentTopic,

View File

@ -611,7 +611,7 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W
let queryRes = await node.wakuStoreClient.query(query, peer) let queryRes = await node.wakuStoreClient.query(query, peer)
if queryRes.isErr(): if queryRes.isErr():
return err(queryRes.error) return err($queryRes.error)
let response = queryRes.get() let response = queryRes.get()

View File

@ -1,13 +1,9 @@
import import
./waku_store/protocol, ./waku_store/common,
./waku_store/rpc, ./waku_store/message_store,
./waku_store/rpc_codec, ./waku_store/protocol
./waku_store/pagination,
./waku_store/message_store
export export
protocol, common,
rpc, message_store,
rpc_codec, protocol
pagination,
message_store

View File

@ -16,9 +16,8 @@ import
../../utils/time, ../../utils/time,
../waku_message, ../waku_message,
../waku_swap/waku_swap, ../waku_swap/waku_swap,
./protocol,
./protocol_metrics, ./protocol_metrics,
./pagination, ./common,
./rpc, ./rpc,
./rpc_codec, ./rpc_codec,
./message_store ./message_store
@ -28,6 +27,10 @@ logScope:
topics = "waku store client" topics = "waku store client"
const
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
type WakuStoreClient* = ref object type WakuStoreClient* = ref object
peerManager: PeerManager peerManager: PeerManager
rng: ref rand.HmacDrbgContext rng: ref rand.HmacDrbgContext
@ -40,28 +43,45 @@ proc new*(T: type WakuStoreClient,
store: MessageStore): T = store: MessageStore): T =
WakuStoreClient(peerManager: peerManager, rng: rng, store: store) WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} =
proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
if connOpt.isNone(): if connOpt.isNone():
waku_store_errors.inc(labelValues = [dialFailure]) waku_store_errors.inc(labelValues = [dialFailure])
return err(dialFailure) return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer))
let connection = connOpt.get() let connection = connOpt.get()
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
await connection.writeLP(rpc.encode().buffer)
var message = await connection.readLp(MaxRpcSize.int) let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req.toRPC())
let response = HistoryRPC.decode(message) await connection.writeLP(reqRpc.encode().buffer)
if response.isErr():
error "failed to decode response" let buf = await connection.readLp(MaxRpcSize.int)
let respDecodeRes = HistoryRPC.decode(buf)
if respDecodeRes.isErr():
waku_store_errors.inc(labelValues = [decodeRpcFailure]) waku_store_errors.inc(labelValues = [decodeRpcFailure])
return err(decodeRpcFailure) return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: decodeRpcFailure))
return ok(response.value.response) let respRpc = respDecodeRes.get()
proc queryWithPaging*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
# Disabled ,for now, since the default response is a possible case (no messages, pagesize = 0, error = NONE(0))
# TODO: Rework the RPC protocol to differentiate the default value from an empty value (e.g., status = 200 (OK))
# and rework the protobuf parsing to return Option[T] when empty values are received
# if respRpc.response == default(HistoryResponseRPC):
# waku_store_errors.inc(labelValues = [emptyRpcResponseFailure])
# return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure))
let resp = respRpc.response
return resp.toAPI()
proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} =
return await w.sendHistoryQueryRPC(req, peer)
proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages. ## it retrieves the historical messages in pages.
## Returns all the fetched messages, if error occurs, returns an error string ## Returns all the fetched messages, if error occurs, returns an error string
@ -72,29 +92,34 @@ proc queryWithPaging*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerI
var messageList: seq[WakuMessage] = @[] var messageList: seq[WakuMessage] = @[]
while true: while true:
let res = await w.query(req, peer) let queryRes = await w.query(req, peer)
if res.isErr(): if queryRes.isErr():
return err(res.error) return err($queryRes.error)
let response = res.get() let response = queryRes.get()
messageList.add(response.messages) messageList.add(response.messages)
# Check whether it is the last page # Check whether it is the last page
if response.pagingInfo == PagingInfo(): if response.cursor.isNone():
break break
# Update paging cursor # Update paging cursor
req.pagingInfo.cursor = response.pagingInfo.cursor req.cursor = response.cursor
return ok(messageList) return ok(messageList)
proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## Resume store
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## Loops through the peers candidate list in order and sends the query to each ## Loops through the peers candidate list in order and sends the query to each
## ##
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq. ## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq.
let queryFuturesList = peers.mapIt(w.queryWithPaging(req, it)) let queryFuturesList = peers.mapIt(w.queryAll(req, it))
await allFutures(queryFuturesList) await allFutures(queryFuturesList)
@ -115,10 +140,6 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo
return ok(messagesList) return ok(messagesList)
## Resume store
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc resume*(w: WakuStoreClient, proc resume*(w: WakuStoreClient,
peerList = none(seq[RemotePeerInfo]), peerList = none(seq[RemotePeerInfo]),
pageSize = DefaultPageSize, pageSize = DefaultPageSize,
@ -153,13 +174,11 @@ proc resume*(w: WakuStoreClient,
queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0) queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0)
let req = HistoryQuery( let req = HistoryQuery(
pubsubTopic: pubsubTopic, pubsubTopic: some(pubsubTopic),
startTime: queryStartTime, startTime: some(queryStartTime),
endTime: queryEndTime, endTime: some(queryEndTime),
pagingInfo: PagingInfo( pageSize: uint64(pageSize),
direction:PagingDirection.FORWARD, ascending: true
pageSize: uint64(pageSize)
)
) )
var res: WakuStoreResult[seq[WakuMessage]] var res: WakuStoreResult[seq[WakuMessage]]
@ -177,7 +196,7 @@ proc resume*(w: WakuStoreClient,
return err("no suitable remote peers") return err("no suitable remote peers")
debug "a peer is selected from peer manager" debug "a peer is selected from peer manager"
res = await w.queryWithPaging(req, peerOpt.get()) res = await w.queryAll(req, peerOpt.get())
if res.isErr(): if res.isErr():
debug "failed to resume the history" debug "failed to resume the history"

View File

@ -0,0 +1,104 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
stew/results,
stew/byteutils,
nimcrypto/sha2
import
../../utils/time,
../waku_message
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
DefaultPageSize*: uint64 = 20
MaxPageSize*: uint64 = 100
type WakuStoreResult*[T] = Result[T, string]
## Waku message digest
type MessageDigest* = MDigest[256]
proc computeDigest*(msg: WakuMessage): MessageDigest =
var ctx: sha256
ctx.init()
defer: ctx.clear()
ctx.update(msg.contentTopic.toBytes())
ctx.update(msg.payload)
# Computes the hash
return ctx.finish()
## Public API types
type
HistoryCursor* = object
pubsubTopic*: PubsubTopic
senderTime*: Timestamp
storeTime*: Timestamp
digest*: MessageDigest
HistoryQuery* = object
pubsubTopic*: Option[PubsubTopic]
contentTopics*: seq[ContentTopic]
cursor*: Option[HistoryCursor]
startTime*: Option[Timestamp]
endTime*: Option[Timestamp]
pageSize*: uint64
ascending*: bool
HistoryResponse* = object
messages*: seq[WakuMessage]
pageSize*: uint64
ascending*: bool
cursor*: Option[HistoryCursor]
HistoryErrorKind* {.pure.} = enum
UNKNOWN = uint32(000)
PEER_DIAL_FAILURE = uint32(200)
BAD_RESPONSE = uint32(300)
BAD_REQUEST = uint32(400)
SERVICE_UNAVAILABLE = uint32(503)
HistoryError* = object
case kind*: HistoryErrorKind
of PEER_DIAL_FAILURE:
address*: string
of BAD_RESPONSE, BAD_REQUEST:
cause*: string
else:
discard
HistoryResult* = Result[HistoryResponse, HistoryError]
proc parse*(T: type HistoryErrorKind, kind: uint32): T =
case kind:
of 000, 200, 300, 400, 503:
HistoryErrorKind(kind)
else:
HistoryErrorKind.UNKNOWN
proc `$`*(err: HistoryError): string =
case err.kind:
of HistoryErrorKind.PEER_DIAL_FAILURE:
"PEER_DIAL_FAILURE: " & err.address
of HistoryErrorKind.BAD_RESPONSE:
"BAD_RESPONSE: " & err.cause
of HistoryErrorKind.BAD_REQUEST:
"BAD_REQUEST: " & err.cause
of HistoryErrorKind.SERVICE_UNAVAILABLE:
"SERVICE_UNAVAILABLE"
of HistoryErrorKind.UNKNOWN:
"UNKNOWN"

View File

@ -10,27 +10,28 @@ import
std/[options, times], std/[options, times],
stew/results stew/results
import import
../../utils/time,
../waku_message, ../waku_message,
./pagination, ./common
../../utils/time
type type
MessageStoreResult*[T] = Result[T, string] MessageStoreResult*[T] = Result[T, string]
MessageStoreRow* = (string, WakuMessage, seq[byte], Timestamp) MessageStoreRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)
MessageStore* = ref object of RootObj MessageStore* = ref object of RootObj
# MessageStore interface # MessageStore interface
method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] {.base.} = method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] {.base.} =
let let
digest = computeDigest(message) digest = computeDigest(message)
receivedTime = if message.timestamp > 0: message.timestamp receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat()) else: getNanosecondTime(getTime().toUnixFloat())
ms.put(pubsubTopic, message, digest, receivedTime) ms.put(pubsubTopic, message, digest, receivedTime)
@ -40,8 +41,8 @@ method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow
method getMessagesByHistoryQuery*( method getMessagesByHistoryQuery*(
ms: MessageStore, ms: MessageStore,
contentTopic = none(seq[ContentTopic]), contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string), pubsubTopic = none(PubsubTopic),
cursor = none(PagingIndex), cursor = none(HistoryCursor),
startTime = none(Timestamp), startTime = none(Timestamp),
endTime = none(Timestamp), endTime = none(Timestamp),
maxPageSize = DefaultPageSize, maxPageSize = DefaultPageSize,

View File

@ -1,70 +0,0 @@
## Contains types and utilities for pagination.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
stew/byteutils,
nimcrypto/sha2
import
../waku_message,
../../utils/time
const
MaxPageSize*: uint64 = 100
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
type MessageDigest* = MDigest[256]
type PagingIndex* = object
## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: string
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MessageDigest # calculated over payload and content topic
proc computeDigest*(msg: WakuMessage): MessageDigest =
var ctx: sha256
ctx.init()
defer: ctx.clear()
ctx.update(msg.contentTopic.toBytes())
ctx.update(msg.payload)
# Computes the hash
return ctx.finish()
proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg)
senderTime = msg.timestamp
PagingIndex(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
)
proc `==`*(x, y: PagingIndex): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
type
PagingDirection* {.pure.} = enum
## PagingDirection determines the direction of pagination
BACKWARD = uint32(0)
FORWARD = uint32(1)
PagingInfo* = object
## This type holds the information needed for the pagination
pageSize*: uint64
cursor*: PagingIndex
direction*: PagingDirection

View File

@ -23,9 +23,9 @@ import
../../utils/time, ../../utils/time,
../waku_message, ../waku_message,
../waku_swap/waku_swap, ../waku_swap/waku_swap,
./common,
./rpc, ./rpc,
./rpc_codec, ./rpc_codec,
./pagination,
./message_store, ./message_store,
./protocol_metrics ./protocol_metrics
@ -35,14 +35,10 @@ logScope:
const const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
type type
WakuStoreResult*[T] = Result[T, string]
WakuStore* = ref object of LPProtocol WakuStore* = ref object of LPProtocol
peerManager*: PeerManager peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext rng*: ref rand.HmacDrbgContext
@ -51,6 +47,7 @@ type
retentionPolicy: Option[MessageRetentionPolicy] retentionPolicy: Option[MessageRetentionPolicy]
# TODO: Move to a message store wrapper
proc executeMessageRetentionPolicy*(w: WakuStore) = proc executeMessageRetentionPolicy*(w: WakuStore) =
if w.retentionPolicy.isNone(): if w.retentionPolicy.isNone():
return return
@ -65,7 +62,7 @@ proc executeMessageRetentionPolicy*(w: WakuStore) =
waku_store_errors.inc(labelValues = [retPolicyFailure]) waku_store_errors.inc(labelValues = [retPolicyFailure])
debug "failed execution of retention policy", error=retPolicyRes.error debug "failed execution of retention policy", error=retPolicyRes.error
# TODO: Move to a message store wrapper
proc reportStoredMessagesMetric*(w: WakuStore) = proc reportStoredMessagesMetric*(w: WakuStore) =
if w.store.isNil(): if w.store.isNil():
return return
@ -76,147 +73,7 @@ proc reportStoredMessagesMetric*(w: WakuStore) =
waku_store_messages.set(resCount.value, labelValues = ["stored"]) waku_store_messages.set(resCount.value, labelValues = ["stored"])
# TODO: Move to a message store wrapper
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
## Query history to return a single page of messages matching the query
# Extract query criteria. All query criteria are optional
let
qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic))
else: none(seq[ContentTopic])
qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic)
else: none(string)
qCursor = if query.pagingInfo.cursor != PagingIndex(): some(query.pagingInfo.cursor)
else: none(PagingIndex)
qStartTime = if query.startTime != Timestamp(0): some(query.startTime)
else: none(Timestamp)
qEndTime = if query.endTime != Timestamp(0): some(query.endTime)
else: none(Timestamp)
qMaxPageSize = if query.pagingInfo.pageSize <= 0: DefaultPageSize
else: min(query.pagingInfo.pageSize, MaxPageSize)
qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD
let queryStartTime = getTime().toUnixFloat()
let queryRes = w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize + 1,
ascendingOrder = qAscendingOrder
)
let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_store_query_duration_seconds.observe(queryDuration)
# Build response
# TODO: Improve error reporting
if queryRes.isErr():
return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR)
let rows = queryRes.get()
if rows.len <= 0:
return HistoryResponse(messages: @[], error: HistoryResponseError.NONE)
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
else: rows[0..^2].mapIt(it[1])
var pagingInfo = none(PagingInfo)
# The retrieved messages list should always be in chronological order
if not qAscendingOrder:
messages.reverse()
if rows.len > int(qMaxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
# TODO: Improve coherence of MessageDigest type
var messageDigest: array[32, byte]
for i in 0..<min(digest.len, 32):
messageDigest[i] = digest[i]
let pagingIndex = PagingIndex(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
receiverTime: storeTimestamp,
digest: MessageDigest(data: messageDigest)
)
pagingInfo = some(PagingInfo(
pageSize: uint64(messages.len),
cursor: pagingIndex,
direction: if qAscendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
))
HistoryResponse(
messages: messages,
pagingInfo: pagingInfo.get(PagingInfo()),
error: HistoryResponseError.NONE
)
proc initProtocolHandler*(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int)
let resReq = HistoryRPC.decode(buf)
if resReq.isErr():
error "failed to decode rpc", peerId=conn.peerId
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return
let req = resReq.value
info "received history query", peerId=conn.peerId, requestId=req.requestId, query=req.query
waku_store_queries.inc()
let resp = if not ws.store.isNil(): ws.findMessages(req.query)
# TODO: Improve error reporting
else: HistoryResponse(error: HistoryResponseError.SERVICE_UNAVAILABLE)
if not ws.wakuSwap.isNil():
info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text
# Perform accounting operation
# TODO: Do accounting here, response is HistoryResponse. How do we get node or swap context?
let peerId = conn.peerId
let messages = resp.messages
ws.wakuSwap.credit(peerId, messages.len)
info "sending history response", peerId=conn.peerId, requestId=req.requestId, messages=resp.messages.len
let rpc = HistoryRPC(requestId: req.requestId, response: resp)
await conn.writeLp(rpc.encode().buffer)
ws.handler = handler
ws.codec = WakuStoreCodec
proc new*(T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore,
wakuSwap: WakuSwap = nil,
retentionPolicy=none(MessageRetentionPolicy)): T =
let ws = WakuStore(
rng: rng,
peerManager: peerManager,
store: store,
wakuSwap: wakuSwap,
retentionPolicy: retentionPolicy
)
ws.initProtocolHandler()
return ws
proc isValidMessage(msg: WakuMessage): bool = proc isValidMessage(msg: WakuMessage): bool =
if msg.timestamp == 0: if msg.timestamp == 0:
return true return true
@ -228,6 +85,7 @@ proc isValidMessage(msg: WakuMessage): bool =
return lowerBound <= msg.timestamp and msg.timestamp <= upperBound return lowerBound <= msg.timestamp and msg.timestamp <= upperBound
# TODO: Move to a message store wrapper
proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) = proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if w.store.isNil(): if w.store.isNil():
# Messages should not be stored # Messages should not be stored
@ -260,3 +118,176 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
let insertDuration = getTime().toUnixFloat() - insertStartTime let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_store_insert_duration_seconds.observe(insertDuration) waku_store_insert_duration_seconds.observe(insertDuration)
# TODO: Move to a message store wrapper
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
## Query history to return a single page of messages matching the query
# Extract query criteria. All query criteria are optional
let
qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic])
else: some(query.contentTopics)
qPubSubTopic = query.pubsubTopic
qCursor = query.cursor
qStartTime = query.startTime
qEndTime = query.endTime
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
else: min(query.pageSize, MaxPageSize)
qAscendingOrder = query.ascending
let queryStartTime = getTime().toUnixFloat()
let queryRes = w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize + 1,
ascendingOrder = qAscendingOrder
)
let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_store_query_duration_seconds.observe(queryDuration)
# Build response
if queryRes.isErr():
# TODO: Improve error reporting
return err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
let rows = queryRes.get()
if rows.len <= 0:
return ok(HistoryResponse(
messages: @[],
pageSize: 0,
ascending: qAscendingOrder,
cursor: none(HistoryCursor)
))
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
else: rows[0..^2].mapIt(it[1])
var cursor = none(HistoryCursor)
# The retrieved messages list should always be in chronological order
if not qAscendingOrder:
messages.reverse()
if rows.len > int(qMaxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
# TODO: Improve coherence of MessageDigest type
var messageDigest: array[32, byte]
for i in 0..<min(digest.len, 32):
messageDigest[i] = digest[i]
cursor = some(HistoryCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: storeTimestamp,
digest: MessageDigest(data: messageDigest)
))
ok(HistoryResponse(
messages: messages,
pageSize: uint64(messages.len),
ascending: qAscendingOrder,
cursor: cursor
))
## Protocol
proc initProtocolHandler*(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int)
let decodeRes = HistoryRPC.decode(buf)
if decodeRes.isErr():
error "failed to decode rpc", peerId=conn.peerId
waku_store_errors.inc(labelValues = [decodeRpcFailure])
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
return
let reqRpc = decodeRes.value
if reqRpc.query == default(HistoryQueryRPC):
error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
# TODO: Return (BAD_REQUEST, cause: "empty query")
return
info "received history query", peerId=conn.peerId, requestId=reqRpc.requestId, query=reqRpc.query
waku_store_queries.inc()
if ws.store.isNil():
let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE)
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr
let resp = HistoryResponseRPC(error: respErr.toRPC())
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
await conn.writeLp(rpc.encode().buffer)
return
let query = reqRpc.query.toApi()
let respRes = ws.findMessages(query)
if respRes.isErr():
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error=respRes.error
let resp = respRes.toRPC()
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
await conn.writeLp(rpc.encode().buffer)
return
let resp = respRes.toRPC()
if not ws.wakuSwap.isNil():
info "handle store swap", peerId=conn.peerId, requestId=reqRpc.requestId, text=ws.wakuSwap.text
# Perform accounting operation
# TODO: Do accounting here, response is HistoryResponseRPC. How do we get node or swap context?
let peerId = conn.peerId
let messages = resp.messages
ws.wakuSwap.credit(peerId, messages.len)
info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
await conn.writeLp(rpc.encode().buffer)
ws.handler = handler
ws.codec = WakuStoreCodec
proc new*(T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore,
wakuSwap: WakuSwap = nil,
retentionPolicy=none(MessageRetentionPolicy)): T =
let ws = WakuStore(
rng: rng,
peerManager: peerManager,
store: store,
wakuSwap: wakuSwap,
retentionPolicy: retentionPolicy
)
ws.initProtocolHandler()
ws

View File

@ -20,4 +20,6 @@ const
retPolicyFailure* = "retpolicy_failure" retPolicyFailure* = "retpolicy_failure"
dialFailure* = "dial_failure" dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure" decodeRpcFailure* = "decode_rpc_failure"
peerNotFoundFailure* = "peer_not_found_failure" peerNotFoundFailure* = "peer_not_found_failure"
emptyRpcQueryFailure* = "empty_rpc_query_failure"
emptyRpcResponseFailure* = "empty_rpc_response_failure"

View File

@ -3,35 +3,249 @@ when (NimMajor, NimMinor) < (1, 4):
else: else:
{.push raises: [].} {.push raises: [].}
import
std/[options, sequtils],
stew/results
import import
../../utils/time, ../../utils/time,
../waku_message, ../waku_message,
./pagination ./common
## Wire protocol
type PagingIndexRPC* = object
## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: PubsubTopic
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MessageDigest # calculated over payload and content topic
proc `==`*(x, y: PagingIndexRPC): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
proc compute*(T: type PagingIndexRPC, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg)
senderTime = msg.timestamp
PagingIndexRPC(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
)
type type
HistoryContentFilter* = object PagingDirectionRPC* {.pure.} = enum
## PagingDirection determines the direction of pagination
BACKWARD = uint32(0)
FORWARD = uint32(1)
PagingInfoRPC* = object
## This type holds the information needed for the pagination
pageSize*: uint64
cursor*: PagingIndexRPC
direction*: PagingDirectionRPC
type
HistoryContentFilterRPC* = object
contentTopic*: ContentTopic contentTopic*: ContentTopic
HistoryQuery* = object HistoryQueryRPC* = object
contentFilters*: seq[HistoryContentFilter] contentFilters*: seq[HistoryContentFilterRPC]
pubsubTopic*: string pubsubTopic*: PubsubTopic
pagingInfo*: PagingInfo # used for pagination pagingInfo*: PagingInfoRPC # used for pagination
startTime*: Timestamp # used for time-window query startTime*: Timestamp # used for time-window query
endTime*: Timestamp # used for time-window query endTime*: Timestamp # used for time-window query
HistoryResponseError* {.pure.} = enum HistoryResponseErrorRPC* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request ## HistoryResponseErrorRPC contains error message to inform the querying node about
## the state of its request
NONE = uint32(0) NONE = uint32(0)
INVALID_CURSOR = uint32(1) INVALID_CURSOR = uint32(1)
SERVICE_UNAVAILABLE = uint32(503) SERVICE_UNAVAILABLE = uint32(503)
HistoryResponse* = object HistoryResponseRPC* = object
messages*: seq[WakuMessage] messages*: seq[WakuMessage]
pagingInfo*: PagingInfo # used for pagination pagingInfo*: PagingInfoRPC # used for pagination
error*: HistoryResponseError error*: HistoryResponseErrorRPC
HistoryRPC* = object HistoryRPC* = object
requestId*: string requestId*: string
query*: HistoryQuery query*: HistoryQueryRPC
response*: HistoryResponse response*: HistoryResponseRPC
proc parse*(T: type HistoryResponseErrorRPC, kind: uint32): T =
case kind:
of 0, 1, 503:
HistoryResponseErrorRPC(kind)
else:
# TODO: Improve error variants/move to satus codes
HistoryResponseErrorRPC.INVALID_CURSOR
## Wire protocol type mappings
proc toRPC*(cursor: HistoryCursor): PagingIndexRPC {.gcsafe.}=
PagingIndexRPC(
pubsubTopic: cursor.pubsubTopic,
senderTime: cursor.senderTime,
receiverTime: cursor.storeTime,
digest: cursor.digest
)
proc toAPI*(rpc: PagingIndexRPC): HistoryCursor =
HistoryCursor(
pubsubTopic: rpc.pubsubTopic,
senderTime: rpc.senderTime,
storeTime: rpc.receiverTime,
digest: rpc.digest
)
proc toRPC*(query: HistoryQuery): HistoryQueryRPC =
let
contentFilters = query.contentTopics.mapIt(HistoryContentFilterRPC(contentTopic: it))
pubsubTopic = query.pubsubTopic.get(default(string))
pageSize = query.pageSize
cursor = query.cursor.get(default(HistoryCursor)).toRPC()
direction = if query.ascending: PagingDirectionRPC.FORWARD
else: PagingDirectionRPC.BACKWARD
startTime = query.startTime.get(default(Timestamp))
endTime = query.endTime.get(default(Timestamp))
HistoryQueryRPC(
contentFilters: contentFilters,
pubsubTopic: pubsubTopic,
pagingInfo: PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
),
startTime: startTime,
endTime: endTime
)
proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery =
let
pubsubTopic = if rpc.pubsubTopic == default(string): none(PubsubTopic)
else: some(rpc.pubsubTopic)
contentTopics = rpc.contentFilters.mapIt(it.contentTopic)
cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor)
else: some(rpc.pagingInfo.cursor.toAPI())
startTime = if rpc.startTime == default(Timestamp): none(Timestamp)
else: some(rpc.startTime)
endTime = if rpc.endTime == default(Timestamp): none(Timestamp)
else: some(rpc.endTime)
pageSize = if rpc.pagingInfo == default(PagingInfoRPC): 0.uint64
else: rpc.pagingInfo.pageSize
ascending = if rpc.pagingInfo == default(PagingInfoRPC): true
else: rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD
HistoryQuery(
pubsubTopic: pubsubTopic,
contentTopics: contentTopics,
cursor: cursor,
startTime: startTime,
endTime: endTime,
pageSize: pageSize,
ascending: ascending
)
proc toRPC*(err: HistoryError): HistoryResponseErrorRPC =
# TODO: Better error mappings/move to error codes
case err.kind:
of HistoryErrorKind.BAD_REQUEST:
# TODO: Respond aksi with the reason
HistoryResponseErrorRPC.INVALID_CURSOR
of HistoryErrorKind.SERVICE_UNAVAILABLE:
HistoryResponseErrorRPC.SERVICE_UNAVAILABLE
else:
HistoryResponseErrorRPC.INVALID_CURSOR
proc toAPI*(err: HistoryResponseErrorRPC): HistoryError =
# TODO: Better error mappings/move to error codes
case err:
of HistoryResponseErrorRPC.INVALID_CURSOR:
HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: "invalid cursor")
of HistoryResponseErrorRPC.SERVICE_UNAVAILABLE:
HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE)
else:
HistoryError(kind: HistoryErrorKind.UNKNOWN)
proc toRPC*(res: HistoryResult): HistoryResponseRPC =
if res.isErr():
let error = res.error.toRPC()
HistoryResponseRPC(error: error)
else:
let resp = res.get()
let
messages = resp.messages
pagingInfo = block:
if resp.cursor.isNone():
default(PagingInfoRPC)
else:
let
pageSize = resp.pageSize
cursor = resp.cursor.get(default(HistoryCursor)).toRPC()
direction = if resp.ascending: PagingDirectionRPC.FORWARD
else: PagingDirectionRPC.BACKWARD
PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
)
error = HistoryResponseErrorRPC.NONE
HistoryResponseRPC(
messages: messages,
pagingInfo: pagingInfo,
error: error
)
proc toAPI*(rpc: HistoryResponseRPC): HistoryResult =
if rpc.error != HistoryResponseErrorRPC.NONE:
err(rpc.error.toAPI())
else:
let
messages = rpc.messages
pageSize = rpc.pagingInfo.pageSize
ascending = rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD
cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor)
else: some(rpc.pagingInfo.cursor.toAPI())
ok(HistoryResponse(
messages: messages,
pageSize: pageSize,
ascending: ascending,
cursor: cursor
))

View File

@ -11,14 +11,16 @@ import
../waku_message, ../waku_message,
../../utils/protobuf, ../../utils/protobuf,
../../utils/time, ../../utils/time,
./rpc, ./common,
./pagination ./rpc
const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
proc encode*(index: PagingIndex): ProtoBuffer = ## Pagination
proc encode*(index: PagingIndexRPC): ProtoBuffer =
## Encode an Index object into a ProtoBuffer ## Encode an Index object into a ProtoBuffer
## returns the resultant ProtoBuffer ## returns the resultant ProtoBuffer
var pb = initProtoBuffer() var pb = initProtoBuffer()
@ -31,16 +33,16 @@ proc encode*(index: PagingIndex): ProtoBuffer =
pb pb
proc decode*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtoResult[T] =
## creates and returns an Index object out of buffer ## creates and returns an Index object out of buffer
var index = PagingIndex() var index = PagingIndexRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var data: seq[byte] var data: seq[byte]
discard ?pb.getField(1, data) discard ?pb.getField(1, data)
# create digest from data # create digest from data
index.digest = MDigest[256]() index.digest = MessageDigest()
for count, b in data: for count, b in data:
index.digest.data[count] = b index.digest.data[count] = b
@ -60,7 +62,7 @@ proc decode*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] =
ok(index) ok(index)
proc encode*(pinfo: PagingInfo): ProtoBuffer = proc encode*(pinfo: PagingInfoRPC): ProtoBuffer =
## Encodes a PagingInfo object into a ProtoBuffer ## Encodes a PagingInfo object into a ProtoBuffer
## returns the resultant ProtoBuffer ## returns the resultant ProtoBuffer
var pb = initProtoBuffer() var pb = initProtoBuffer()
@ -72,9 +74,9 @@ proc encode*(pinfo: PagingInfo): ProtoBuffer =
pb pb
proc decode*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type PagingInfoRPC, buffer: seq[byte]): ProtoResult[T] =
## creates and returns a PagingInfo object out of buffer ## creates and returns a PagingInfo object out of buffer
var pagingInfo = PagingInfo() var pagingInfo = PagingInfoRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var pageSize: uint64 var pageSize: uint64
@ -83,16 +85,18 @@ proc decode*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
var cursorBuffer: seq[byte] var cursorBuffer: seq[byte]
discard ?pb.getField(2, cursorBuffer) discard ?pb.getField(2, cursorBuffer)
pagingInfo.cursor = ?PagingIndex.decode(cursorBuffer) pagingInfo.cursor = ?PagingIndexRPC.decode(cursorBuffer)
var direction: uint32 var direction: uint32
discard ?pb.getField(3, direction) discard ?pb.getField(3, direction)
pagingInfo.direction = PagingDirection(direction) pagingInfo.direction = PagingDirectionRPC(direction)
ok(pagingInfo) ok(pagingInfo)
proc encode*(filter: HistoryContentFilter): ProtoBuffer = ## Wire protocol
proc encode*(filter: HistoryContentFilterRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(1, filter.contentTopic) pb.write3(1, filter.contentTopic)
@ -100,15 +104,15 @@ proc encode*(filter: HistoryContentFilter): ProtoBuffer =
pb pb
proc decode*(T: type HistoryContentFilter, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryContentFilterRPC, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var contentTopic: ContentTopic var contentTopic: ContentTopic
discard ?pb.getField(1, contentTopic) discard ?pb.getField(1, contentTopic)
ok(HistoryContentFilter(contentTopic: contentTopic)) ok(HistoryContentFilterRPC(contentTopic: contentTopic))
proc encode*(query: HistoryQuery): ProtoBuffer = proc encode*(query: HistoryQueryRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write3(2, query.pubsubTopic) pb.write3(2, query.pubsubTopic)
@ -122,8 +126,8 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
pb pb
proc decode*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryQueryRPC, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryQuery() var msg = HistoryQueryRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
discard ?pb.getField(2, msg.pubsubTopic) discard ?pb.getField(2, msg.pubsubTopic)
@ -132,12 +136,12 @@ proc decode*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
discard ?pb.getRepeatedField(3, buffs) discard ?pb.getRepeatedField(3, buffs)
for pb in buffs: for pb in buffs:
msg.contentFilters.add(? HistoryContentFilter.decode(pb)) msg.contentFilters.add(? HistoryContentFilterRPC.decode(pb))
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ?pb.getField(4, pagingInfoBuffer) discard ?pb.getField(4, pagingInfoBuffer)
msg.pagingInfo = ?PagingInfo.decode(pagingInfoBuffer) msg.pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer)
var startTime: zint64 var startTime: zint64
discard ?pb.getField(5, startTime) discard ?pb.getField(5, startTime)
@ -150,7 +154,7 @@ proc decode*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
ok(msg) ok(msg)
proc encode*(response: HistoryResponse): ProtoBuffer = proc encode*(response: HistoryResponseRPC): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
for msg in response.messages: for msg in response.messages:
@ -162,8 +166,8 @@ proc encode*(response: HistoryResponse): ProtoBuffer =
pb pb
proc decode*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = proc decode*(T: type HistoryResponseRPC, buffer: seq[byte]): ProtoResult[T] =
var msg = HistoryResponse() var msg = HistoryResponseRPC()
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]] var messages: seq[seq[byte]]
@ -175,11 +179,11 @@ proc decode*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
var pagingInfoBuffer: seq[byte] var pagingInfoBuffer: seq[byte]
discard ?pb.getField(3, pagingInfoBuffer) discard ?pb.getField(3, pagingInfoBuffer)
msg.pagingInfo = ?PagingInfo.decode(pagingInfoBuffer) msg.pagingInfo = ?PagingInfoRPC.decode(pagingInfoBuffer)
var error: uint32 var error: uint32
discard ?pb.getField(4, error) discard ?pb.getField(4, error)
msg.error = HistoryResponseError(error) msg.error = HistoryResponseErrorRPC.parse(error)
ok(msg) ok(msg)
@ -201,10 +205,10 @@ proc decode*(T: type HistoryRPC, buffer: seq[byte]): ProtoResult[T] =
var queryBuffer: seq[byte] var queryBuffer: seq[byte]
discard ?pb.getField(2, queryBuffer) discard ?pb.getField(2, queryBuffer)
rpc.query = ?HistoryQuery.decode(queryBuffer) rpc.query = ?HistoryQueryRPC.decode(queryBuffer)
var responseBuffer: seq[byte] var responseBuffer: seq[byte]
discard ?pb.getField(3, responseBuffer) discard ?pb.getField(3, responseBuffer)
rpc.response = ?HistoryResponse.decode(responseBuffer) rpc.response = ?HistoryResponseRPC.decode(responseBuffer)
ok(rpc) ok(rpc)