mirror of https://github.com/waku-org/nwaku.git
feat(store): add waku store client module
This commit is contained in:
parent
89c75f414f
commit
e5c3aa560e
|
@ -9,7 +9,7 @@ import
|
||||||
metrics/chronos_httpserver
|
metrics/chronos_httpserver
|
||||||
import
|
import
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store/protocol_metrics,
|
||||||
../../waku/v2/protocol/waku_lightpush,
|
../../waku/v2/protocol/waku_lightpush,
|
||||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||||
../../waku/v2/protocol/waku_peer_exchange,
|
../../waku/v2/protocol/waku_peer_exchange,
|
||||||
|
|
|
@ -13,6 +13,7 @@ import
|
||||||
./v2/test_message_store_sqlite,
|
./v2/test_message_store_sqlite,
|
||||||
./v2/test_waku_store_rpc_codec,
|
./v2/test_waku_store_rpc_codec,
|
||||||
./v2/test_waku_store,
|
./v2/test_waku_store,
|
||||||
|
./v2/test_waku_store_client,
|
||||||
# TODO: Re-enable store resume test cases (#1282)
|
# TODO: Re-enable store resume test cases (#1282)
|
||||||
# ./v2/test_waku_store_resume,
|
# ./v2/test_waku_store_resume,
|
||||||
./v2/test_wakunode_store,
|
./v2/test_wakunode_store,
|
||||||
|
|
|
@ -11,6 +11,7 @@ import
|
||||||
import
|
import
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
|
../../waku/v2/protocol/waku_store/client,
|
||||||
../../waku/v2/node/storage/sqlite,
|
../../waku/v2/node/storage/sqlite,
|
||||||
../../waku/v2/node/storage/message/waku_store_queue,
|
../../waku/v2/node/storage/message/waku_store_queue,
|
||||||
../../waku/v2/node/storage/message/sqlite_store,
|
../../waku/v2/node/storage/message/sqlite_store,
|
||||||
|
@ -53,17 +54,24 @@ proc newTestMessageStore(): MessageStore =
|
||||||
let database = newTestDatabase()
|
let database = newTestDatabase()
|
||||||
SqliteStore.init(database).tryGet()
|
SqliteStore.init(database).tryGet()
|
||||||
|
|
||||||
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): WakuStore =
|
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} =
|
||||||
let
|
let
|
||||||
peerManager = PeerManager.new(switch)
|
peerManager = PeerManager.new(switch)
|
||||||
rng = crypto.newRng()
|
rng = crypto.newRng()
|
||||||
proto = WakuStore.init(peerManager, rng, store)
|
proto = WakuStore.init(peerManager, rng, store)
|
||||||
|
|
||||||
waitFor proto.start()
|
await proto.start()
|
||||||
switch.mount(proto)
|
switch.mount(proto)
|
||||||
|
|
||||||
return proto
|
return proto
|
||||||
|
|
||||||
|
proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient =
|
||||||
|
let
|
||||||
|
peerManager = PeerManager.new(switch)
|
||||||
|
rng = crypto.newRng()
|
||||||
|
WakuStoreClient.new(peerManager, rng, store)
|
||||||
|
|
||||||
|
|
||||||
procSuite "Waku Store - history query":
|
procSuite "Waku Store - history query":
|
||||||
## Fixtures
|
## Fixtures
|
||||||
let storeA = block:
|
let storeA = block:
|
||||||
|
@ -96,10 +104,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
|
@ -108,12 +116,12 @@ procSuite "Waku Store - history query":
|
||||||
msg1 = fakeWakuMessage(contentTopic=topic)
|
msg1 = fakeWakuMessage(contentTopic=topic)
|
||||||
msg2 = fakeWakuMessage()
|
msg2 = fakeWakuMessage()
|
||||||
|
|
||||||
serverProto.handleMessage("foo", msg1)
|
server.handleMessage("foo", msg1)
|
||||||
serverProto.handleMessage("foo", msg2)
|
server.handleMessage("foo", msg2)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
|
||||||
let resQuery = await clientProto.query(rpc)
|
let resQuery = await client.query(rpc)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
|
@ -122,7 +130,7 @@ procSuite "Waku Store - history query":
|
||||||
let response = resQuery.tryGet()
|
let response = resQuery.tryGet()
|
||||||
check:
|
check:
|
||||||
response.messages.len == 1
|
response.messages.len == 1
|
||||||
response.messages[0] == msg1
|
response.messages == @[msg1]
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
||||||
|
@ -136,10 +144,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
|
@ -152,16 +160,16 @@ procSuite "Waku Store - history query":
|
||||||
msg2 = fakeWakuMessage(contentTopic=topic2)
|
msg2 = fakeWakuMessage(contentTopic=topic2)
|
||||||
msg3 = fakeWakuMessage(contentTopic=topic3)
|
msg3 = fakeWakuMessage(contentTopic=topic3)
|
||||||
|
|
||||||
serverProto.handleMessage("foo", msg1)
|
server.handleMessage("foo", msg1)
|
||||||
serverProto.handleMessage("foo", msg2)
|
server.handleMessage("foo", msg2)
|
||||||
serverProto.handleMessage("foo", msg3)
|
server.handleMessage("foo", msg3)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(contentFilters: @[
|
let rpc = HistoryQuery(contentFilters: @[
|
||||||
HistoryContentFilter(contentTopic: topic1),
|
HistoryContentFilter(contentTopic: topic1),
|
||||||
HistoryContentFilter(contentTopic: topic3)
|
HistoryContentFilter(contentTopic: topic3)
|
||||||
])
|
])
|
||||||
let resQuery = await clientProto.query(rpc)
|
let resQuery = await client.query(rpc)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
|
@ -185,10 +193,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
|
@ -205,9 +213,9 @@ procSuite "Waku Store - history query":
|
||||||
msg2 = fakeWakuMessage(contentTopic=contentTopic2)
|
msg2 = fakeWakuMessage(contentTopic=contentTopic2)
|
||||||
msg3 = fakeWakuMessage(contentTopic=contentTopic3)
|
msg3 = fakeWakuMessage(contentTopic=contentTopic3)
|
||||||
|
|
||||||
serverProto.handleMessage(pubsubtopic1, msg1)
|
server.handleMessage(pubsubtopic1, msg1)
|
||||||
serverProto.handleMessage(pubsubtopic2, msg2)
|
server.handleMessage(pubsubtopic2, msg2)
|
||||||
serverProto.handleMessage(pubsubtopic2, msg3)
|
server.handleMessage(pubsubtopic2, msg3)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
|
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
|
||||||
|
@ -216,7 +224,7 @@ procSuite "Waku Store - history query":
|
||||||
HistoryContentFilter(contentTopic: contentTopic3)],
|
HistoryContentFilter(contentTopic: contentTopic3)],
|
||||||
pubsubTopic: pubsubTopic1
|
pubsubTopic: pubsubTopic1
|
||||||
)
|
)
|
||||||
let resQuery = await clientProto.query(rpc)
|
let resQuery = await client.query(rpc)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
|
@ -239,10 +247,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
|
@ -254,13 +262,13 @@ procSuite "Waku Store - history query":
|
||||||
msg2 = fakeWakuMessage()
|
msg2 = fakeWakuMessage()
|
||||||
msg3 = fakeWakuMessage()
|
msg3 = fakeWakuMessage()
|
||||||
|
|
||||||
serverProto.handleMessage(pubsubtopic2, msg1)
|
server.handleMessage(pubsubtopic2, msg1)
|
||||||
serverProto.handleMessage(pubsubtopic2, msg2)
|
server.handleMessage(pubsubtopic2, msg2)
|
||||||
serverProto.handleMessage(pubsubtopic2, msg3)
|
server.handleMessage(pubsubtopic2, msg3)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
|
let rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
|
||||||
let res = await clientProto.query(rpc)
|
let res = await client.query(rpc)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
|
@ -282,10 +290,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let pubsubTopic = "queried-topic"
|
let pubsubTopic = "queried-topic"
|
||||||
|
@ -295,13 +303,13 @@ procSuite "Waku Store - history query":
|
||||||
msg2 = fakeWakuMessage(payload="TEST-2")
|
msg2 = fakeWakuMessage(payload="TEST-2")
|
||||||
msg3 = fakeWakuMessage(payload="TEST-3")
|
msg3 = fakeWakuMessage(payload="TEST-3")
|
||||||
|
|
||||||
serverProto.handleMessage(pubsubTopic, msg1)
|
server.handleMessage(pubsubTopic, msg1)
|
||||||
serverProto.handleMessage(pubsubTopic, msg2)
|
server.handleMessage(pubsubTopic, msg2)
|
||||||
serverProto.handleMessage(pubsubTopic, msg3)
|
server.handleMessage(pubsubTopic, msg3)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(pubsubTopic: pubsubTopic)
|
let rpc = HistoryQuery(pubsubTopic: pubsubTopic)
|
||||||
let res = await clientProto.query(rpc)
|
let res = await client.query(rpc)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
|
@ -326,10 +334,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
@ -347,14 +355,14 @@ procSuite "Waku Store - history query":
|
||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
require server.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
var rpc = HistoryQuery(
|
var rpc = HistoryQuery(
|
||||||
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
||||||
)
|
)
|
||||||
var res = await clientProto.query(rpc)
|
var res = await client.query(rpc)
|
||||||
require res.isOk()
|
require res.isOk()
|
||||||
|
|
||||||
var
|
var
|
||||||
|
@ -372,7 +380,7 @@ procSuite "Waku Store - history query":
|
||||||
rpc.pagingInfo = response.pagingInfo
|
rpc.pagingInfo = response.pagingInfo
|
||||||
|
|
||||||
# Continue querying
|
# Continue querying
|
||||||
res = await clientProto.query(rpc)
|
res = await client.query(rpc)
|
||||||
require res.isOk()
|
require res.isOk()
|
||||||
response = res.tryGet()
|
response = res.tryGet()
|
||||||
totalMessages += response.messages.len()
|
totalMessages += response.messages.len()
|
||||||
|
@ -395,10 +403,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
@ -416,14 +424,14 @@ procSuite "Waku Store - history query":
|
||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
require server.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
var rpc = HistoryQuery(
|
var rpc = HistoryQuery(
|
||||||
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
||||||
)
|
)
|
||||||
var res = await clientProto.query(rpc)
|
var res = await client.query(rpc)
|
||||||
require res.isOk()
|
require res.isOk()
|
||||||
|
|
||||||
var
|
var
|
||||||
|
@ -441,7 +449,7 @@ procSuite "Waku Store - history query":
|
||||||
rpc.pagingInfo = response.pagingInfo
|
rpc.pagingInfo = response.pagingInfo
|
||||||
|
|
||||||
# Continue querying
|
# Continue querying
|
||||||
res = await clientProto.query(rpc)
|
res = await client.query(rpc)
|
||||||
require res.isOk()
|
require res.isOk()
|
||||||
response = res.tryGet()
|
response = res.tryGet()
|
||||||
totalMessages += response.messages.len()
|
totalMessages += response.messages.len()
|
||||||
|
@ -464,10 +472,10 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
serverProto = newTestWakuStore(serverSwitch)
|
server = await newTestWakuStore(serverSwitch)
|
||||||
clientProto = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let msgList = @[
|
let msgList = @[
|
||||||
|
@ -484,11 +492,11 @@ procSuite "Waku Store - history query":
|
||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
require server.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
|
||||||
let res = await clientProto.query(rpc)
|
let res = await client.query(rpc)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
|
@ -514,7 +522,7 @@ procSuite "Waku Store - history query":
|
||||||
|
|
||||||
let
|
let
|
||||||
server = newTestWakuStore(serverSwitch, store=storeA)
|
server = newTestWakuStore(serverSwitch, store=storeA)
|
||||||
client = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
@ -550,8 +558,8 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
server = newTestWakuStore(serverSwitch, store=storeA)
|
server = await newTestWakuStore(serverSwitch, store=storeA)
|
||||||
client = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
@ -585,8 +593,8 @@ procSuite "Waku Store - history query":
|
||||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
let
|
let
|
||||||
server = newTestWakuStore(serverSwitch, store=storeA)
|
server = await newTestWakuStore(serverSwitch, store=storeA)
|
||||||
client = newTestWakuStore(clientSwitch)
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
@ -617,7 +625,7 @@ suite "Waku Store - message handling":
|
||||||
## Setup
|
## Setup
|
||||||
let store = StoreQueueRef.new(5)
|
let store = StoreQueueRef.new(5)
|
||||||
let switch = newTestSwitch()
|
let switch = newTestSwitch()
|
||||||
let proto = newTestWakuStore(switch, store)
|
let proto = await newTestWakuStore(switch, store)
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let validSenderTime = now()
|
let validSenderTime = now()
|
||||||
|
@ -637,7 +645,7 @@ suite "Waku Store - message handling":
|
||||||
## Setup
|
## Setup
|
||||||
let store = StoreQueueRef.new(10)
|
let store = StoreQueueRef.new(10)
|
||||||
let switch = newTestSwitch()
|
let switch = newTestSwitch()
|
||||||
let proto = newTestWakuStore(switch, store)
|
let proto = await newTestWakuStore(switch, store)
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let msgList = @[
|
let msgList = @[
|
||||||
|
@ -663,7 +671,7 @@ suite "Waku Store - message handling":
|
||||||
## Setup
|
## Setup
|
||||||
let store = StoreQueueRef.new(5)
|
let store = StoreQueueRef.new(5)
|
||||||
let switch = newTestSwitch()
|
let switch = newTestSwitch()
|
||||||
let proto = newTestWakuStore(switch, store)
|
let proto = await newTestWakuStore(switch, store)
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let invalidSenderTime = 0
|
let invalidSenderTime = 0
|
||||||
|
@ -683,7 +691,7 @@ suite "Waku Store - message handling":
|
||||||
## Setup
|
## Setup
|
||||||
let store = StoreQueueRef.new(5)
|
let store = StoreQueueRef.new(5)
|
||||||
let switch = newTestSwitch()
|
let switch = newTestSwitch()
|
||||||
let proto = newTestWakuStore(switch, store)
|
let proto = await newTestWakuStore(switch, store)
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
|
@ -706,7 +714,7 @@ suite "Waku Store - message handling":
|
||||||
## Setup
|
## Setup
|
||||||
let store = StoreQueueRef.new(5)
|
let store = StoreQueueRef.new(5)
|
||||||
let switch = newTestSwitch()
|
let switch = newTestSwitch()
|
||||||
let proto = newTestWakuStore(switch, store)
|
let proto = await newTestWakuStore(switch, store)
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
|
|
|
@ -0,0 +1,273 @@
|
||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[options, tables, sets, times],
|
||||||
|
stew/byteutils,
|
||||||
|
testutils/unittests,
|
||||||
|
chronos,
|
||||||
|
chronicles,
|
||||||
|
libp2p/switch,
|
||||||
|
libp2p/crypto/crypto
|
||||||
|
import
|
||||||
|
../../waku/v2/protocol/waku_message,
|
||||||
|
../../waku/v2/protocol/waku_store,
|
||||||
|
../../waku/v2/protocol/waku_store/client,
|
||||||
|
../../waku/v2/protocol/waku_store/protocol_metrics,
|
||||||
|
../../waku/v2/node/storage/sqlite,
|
||||||
|
../../waku/v2/node/storage/message/sqlite_store,
|
||||||
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
|
../../waku/v2/utils/time,
|
||||||
|
../test_helpers
|
||||||
|
|
||||||
|
|
||||||
|
const
|
||||||
|
DefaultPubsubTopic = "/waku/2/default-waku/proto"
|
||||||
|
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
|
||||||
|
|
||||||
|
proc now(): Timestamp =
|
||||||
|
getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
|
proc newTestDatabase(): SqliteDatabase =
|
||||||
|
SqliteDatabase.init("", inMemory = true).tryGet()
|
||||||
|
|
||||||
|
proc fakeWakuMessage(
|
||||||
|
payload = toBytes("TEST-PAYLOAD"),
|
||||||
|
contentTopic = DefaultContentTopic,
|
||||||
|
ts = now(),
|
||||||
|
ephemeral = false,
|
||||||
|
): WakuMessage =
|
||||||
|
WakuMessage(
|
||||||
|
payload: payload,
|
||||||
|
contentTopic: contentTopic,
|
||||||
|
version: 1,
|
||||||
|
timestamp: ts,
|
||||||
|
ephemeral: ephemeral,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
|
||||||
|
let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get())
|
||||||
|
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
|
||||||
|
return newStandardSwitch(some(peerKey), addrs=peerAddr)
|
||||||
|
|
||||||
|
proc newTestStore(): MessageStore =
|
||||||
|
let database = newTestDatabase()
|
||||||
|
SqliteStore.init(database).tryGet()
|
||||||
|
|
||||||
|
proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore =
|
||||||
|
let
|
||||||
|
peerManager = PeerManager.new(switch)
|
||||||
|
rng = crypto.newRng()
|
||||||
|
proto = WakuStore.init(peerManager, rng, store)
|
||||||
|
|
||||||
|
waitFor proto.start()
|
||||||
|
switch.mount(proto)
|
||||||
|
|
||||||
|
return proto
|
||||||
|
|
||||||
|
proc newTestWakuStoreClient(switch: Switch, store: MessageStore = nil): WakuStoreClient =
|
||||||
|
let
|
||||||
|
peerManager = PeerManager.new(switch)
|
||||||
|
rng = crypto.newRng()
|
||||||
|
WakuStoreClient.new(peerManager, rng, store)
|
||||||
|
|
||||||
|
|
||||||
|
procSuite "Waku Store Client":
|
||||||
|
|
||||||
|
## Fixtures
|
||||||
|
let testStore = block:
|
||||||
|
let store = newTestStore()
|
||||||
|
let msgList = @[
|
||||||
|
fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("0")),
|
||||||
|
fakeWakuMessage(payload= @[byte 1], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 2], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 3], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 4], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 5], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 6], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 7], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 8], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 9], contentTopic=ContentTopic("9")),
|
||||||
|
fakeWakuMessage(payload= @[byte 10], contentTopic=DefaultContentTopic),
|
||||||
|
fakeWakuMessage(payload= @[byte 11], contentTopic=ContentTopic("11")),
|
||||||
|
fakeWakuMessage(payload= @[byte 12], contentTopic=DefaultContentTopic),
|
||||||
|
]
|
||||||
|
|
||||||
|
for msg in msgList:
|
||||||
|
assert store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
|
store
|
||||||
|
|
||||||
|
asyncTest "single query to peer":
|
||||||
|
## Setup
|
||||||
|
let
|
||||||
|
serverSwitch = newTestSwitch()
|
||||||
|
clientSwitch = newTestSwitch()
|
||||||
|
|
||||||
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
|
let
|
||||||
|
server = newTestWakuStore(serverSwitch, store=testStore)
|
||||||
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let peer = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||||
|
let rpc = HistoryQuery(
|
||||||
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
|
pagingInfo: PagingInfo(pageSize: 8)
|
||||||
|
)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await client.query(rpc, peer)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res.isOk()
|
||||||
|
|
||||||
|
let response = res.tryGet()
|
||||||
|
check:
|
||||||
|
## No pagination specified. Response will be auto-paginated with
|
||||||
|
## up to MaxPageSize messages per page.
|
||||||
|
response.messages.len() == 8
|
||||||
|
response.pagingInfo != PagingInfo()
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
|
|
||||||
|
asyncTest "multiple query to peer with pagination":
|
||||||
|
## Setup
|
||||||
|
let
|
||||||
|
serverSwitch = newTestSwitch()
|
||||||
|
clientSwitch = newTestSwitch()
|
||||||
|
|
||||||
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
|
let
|
||||||
|
server = newTestWakuStore(serverSwitch, store=testStore)
|
||||||
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let peer = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||||
|
let rpc = HistoryQuery(
|
||||||
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
|
pagingInfo: PagingInfo(pageSize: 5)
|
||||||
|
)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await client.queryWithPaging(rpc, peer)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res.isOk()
|
||||||
|
|
||||||
|
let response = res.tryGet()
|
||||||
|
check:
|
||||||
|
response.len == 10
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
|
|
||||||
|
asyncTest "multiple query to multiple peers with pagination":
|
||||||
|
## Setup
|
||||||
|
let
|
||||||
|
serverSwitchA = newTestSwitch()
|
||||||
|
serverSwitchB = newTestSwitch()
|
||||||
|
clientSwitch = newTestSwitch()
|
||||||
|
|
||||||
|
await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start())
|
||||||
|
|
||||||
|
let
|
||||||
|
serverA = newTestWakuStore(serverSwitchA, store=testStore)
|
||||||
|
serverB = newTestWakuStore(serverSwitchB, store=testStore)
|
||||||
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let peers = @[
|
||||||
|
serverSwitchA.peerInfo.toRemotePeerInfo(),
|
||||||
|
serverSwitchB.peerInfo.toRemotePeerInfo()
|
||||||
|
]
|
||||||
|
let rpc = HistoryQuery(
|
||||||
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
|
pagingInfo: PagingInfo(pageSize: 5)
|
||||||
|
)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await client.queryLoop(rpc, peers)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res.isOk()
|
||||||
|
|
||||||
|
let response = res.tryGet()
|
||||||
|
check:
|
||||||
|
response.len == 10
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop())
|
||||||
|
|
||||||
|
asyncTest "single query with no pre-configured store peer should fail":
|
||||||
|
## Setup
|
||||||
|
let
|
||||||
|
serverSwitch = newTestSwitch()
|
||||||
|
clientSwitch = newTestSwitch()
|
||||||
|
|
||||||
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
|
let
|
||||||
|
server = newTestWakuStore(serverSwitch, store=testStore)
|
||||||
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let rpc = HistoryQuery(
|
||||||
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
|
pagingInfo: PagingInfo(pageSize: 8)
|
||||||
|
)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await client.query(rpc)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res.isErr()
|
||||||
|
res.error == peerNotFoundFailure
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
|
|
||||||
|
asyncTest "single query to pre-configured store peer":
|
||||||
|
## Setup
|
||||||
|
let
|
||||||
|
serverSwitch = newTestSwitch()
|
||||||
|
clientSwitch = newTestSwitch()
|
||||||
|
|
||||||
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||||
|
|
||||||
|
let
|
||||||
|
server = newTestWakuStore(serverSwitch, store=testStore)
|
||||||
|
client = newTestWakuStoreClient(clientSwitch)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let peer = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||||
|
let rpc = HistoryQuery(
|
||||||
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
||||||
|
pagingInfo: PagingInfo(pageSize: 8)
|
||||||
|
)
|
||||||
|
|
||||||
|
## When
|
||||||
|
client.setPeer(peer)
|
||||||
|
|
||||||
|
let res = await client.query(rpc)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res.isOk()
|
||||||
|
|
||||||
|
let response = res.tryGet()
|
||||||
|
check:
|
||||||
|
## No pagination specified. Response will be auto-paginated with
|
||||||
|
## up to MaxPageSize messages per page.
|
||||||
|
response.messages.len() == 8
|
||||||
|
response.pagingInfo != PagingInfo()
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
|
@ -0,0 +1,238 @@
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[options, sequtils, times],
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
metrics,
|
||||||
|
bearssl/rand
|
||||||
|
import
|
||||||
|
../../node/peer_manager/peer_manager,
|
||||||
|
../../utils/requests,
|
||||||
|
../../utils/time,
|
||||||
|
../waku_message,
|
||||||
|
../waku_swap/waku_swap,
|
||||||
|
./protocol,
|
||||||
|
./protocol_metrics,
|
||||||
|
./pagination,
|
||||||
|
./rpc,
|
||||||
|
./rpc_codec,
|
||||||
|
./message_store
|
||||||
|
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "wakustore.client"
|
||||||
|
|
||||||
|
|
||||||
|
type WakuStoreClient* = ref object
|
||||||
|
peerManager: PeerManager
|
||||||
|
rng: ref rand.HmacDrbgContext
|
||||||
|
store: MessageStore
|
||||||
|
wakuSwap: WakuSwap
|
||||||
|
|
||||||
|
proc new*(T: type WakuStoreClient,
|
||||||
|
peerManager: PeerManager,
|
||||||
|
rng: ref rand.HmacDrbgContext,
|
||||||
|
store: MessageStore,
|
||||||
|
wakuSwap: WakuSwap = nil): T =
|
||||||
|
WakuStoreClient(peerManager: peerManager, rng: rng, store: store, wakuSwap: wakuSwap)
|
||||||
|
|
||||||
|
|
||||||
|
proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
||||||
|
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
|
||||||
|
if connOpt.isNone():
|
||||||
|
waku_store_errors.inc(labelValues = [dialFailure])
|
||||||
|
return err(dialFailure)
|
||||||
|
let connection = connOpt.get()
|
||||||
|
|
||||||
|
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
|
||||||
|
await connection.writeLP(rpc.encode().buffer)
|
||||||
|
|
||||||
|
var message = await connOpt.get().readLp(MaxRpcSize.int)
|
||||||
|
let response = HistoryRPC.init(message)
|
||||||
|
|
||||||
|
if response.isErr():
|
||||||
|
error "failed to decode response"
|
||||||
|
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
|
return err(decodeRpcFailure)
|
||||||
|
|
||||||
|
return ok(response.value.response)
|
||||||
|
|
||||||
|
proc queryWithPaging*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||||
|
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
|
||||||
|
## it retrieves the historical messages in pages.
|
||||||
|
## Returns all the fetched messages, if error occurs, returns an error string
|
||||||
|
|
||||||
|
# Make a copy of the query
|
||||||
|
var req = query
|
||||||
|
|
||||||
|
var messageList: seq[WakuMessage] = @[]
|
||||||
|
|
||||||
|
while true:
|
||||||
|
let res = await w.query(req, peer)
|
||||||
|
if res.isErr():
|
||||||
|
return err(res.error)
|
||||||
|
|
||||||
|
let response = res.get()
|
||||||
|
|
||||||
|
messageList.add(response.messages)
|
||||||
|
|
||||||
|
# Check whether it is the last page
|
||||||
|
if response.pagingInfo == PagingInfo():
|
||||||
|
break
|
||||||
|
|
||||||
|
# Update paging cursor
|
||||||
|
req.pagingInfo.cursor = response.pagingInfo.cursor
|
||||||
|
|
||||||
|
return ok(messageList)
|
||||||
|
|
||||||
|
proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||||||
|
## Loops through the peers candidate list in order and sends the query to each
|
||||||
|
##
|
||||||
|
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
|
||||||
|
## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq.
|
||||||
|
let queryFuturesList = peers.mapIt(w.queryWithPaging(req, it))
|
||||||
|
|
||||||
|
await allFutures(queryFuturesList)
|
||||||
|
|
||||||
|
let messagesList = queryFuturesList
|
||||||
|
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
|
||||||
|
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
|
||||||
|
if not fut.completed() or fut.read().isErr():
|
||||||
|
return @[]
|
||||||
|
|
||||||
|
fut.read().value
|
||||||
|
)
|
||||||
|
.concat()
|
||||||
|
.deduplicate()
|
||||||
|
|
||||||
|
return ok(messagesList)
|
||||||
|
|
||||||
|
|
||||||
|
### Set store peer and query for messages
|
||||||
|
|
||||||
|
proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) =
|
||||||
|
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||||
|
|
||||||
|
proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
||||||
|
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
||||||
|
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||||
|
# This will require us to check for various factors such as:
|
||||||
|
# - which topics they track
|
||||||
|
# - latency?
|
||||||
|
# - default store peer?
|
||||||
|
|
||||||
|
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
error "no suitable remote peers"
|
||||||
|
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
|
||||||
|
return err(peerNotFoundFailure)
|
||||||
|
|
||||||
|
return await w.query(req, peerOpt.get())
|
||||||
|
|
||||||
|
|
||||||
|
## Resume store
|
||||||
|
|
||||||
|
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
|
||||||
|
|
||||||
|
proc resume*(w: WakuStoreClient,
|
||||||
|
peerList = none(seq[RemotePeerInfo]),
|
||||||
|
pageSize = DefaultPageSize,
|
||||||
|
pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
||||||
|
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||||
|
## messages are stored in the store node's messages field and in the message db
|
||||||
|
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||||
|
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||||
|
## peerList indicates the list of peers to query from.
|
||||||
|
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
|
||||||
|
## Such candidates should be found through a discovery method (to be developed).
|
||||||
|
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||||
|
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||||
|
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||||
|
|
||||||
|
# If store has not been provided, don't even try
|
||||||
|
if w.store.isNil():
|
||||||
|
return err("store not provided (nil)")
|
||||||
|
|
||||||
|
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
|
||||||
|
# of writing, the sqlite store implementation returns the last message's receiver
|
||||||
|
# timestamp.
|
||||||
|
# lastSeenTime = lastSeenItem.get().msg.timestamp
|
||||||
|
let
|
||||||
|
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
|
||||||
|
now = getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
|
debug "resuming with offline time window", lastSeenTime=lastSeenTime, currentTime=now
|
||||||
|
|
||||||
|
let
|
||||||
|
queryEndTime = now + StoreResumeTimeWindowOffset
|
||||||
|
queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0)
|
||||||
|
|
||||||
|
let req = HistoryQuery(
|
||||||
|
pubsubTopic: pubsubTopic,
|
||||||
|
startTime: queryStartTime,
|
||||||
|
endTime: queryEndTime,
|
||||||
|
pagingInfo: PagingInfo(
|
||||||
|
direction:PagingDirection.FORWARD,
|
||||||
|
pageSize: uint64(pageSize)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
var res: WakuStoreResult[seq[WakuMessage]]
|
||||||
|
if peerList.isSome():
|
||||||
|
debug "trying the candidate list to fetch the history"
|
||||||
|
res = await w.queryLoop(req, peerList.get())
|
||||||
|
|
||||||
|
else:
|
||||||
|
debug "no candidate list is provided, selecting a random peer"
|
||||||
|
# if no peerList is set then query from one of the peers stored in the peer manager
|
||||||
|
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
warn "no suitable remote peers"
|
||||||
|
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
|
||||||
|
return err("no suitable remote peers")
|
||||||
|
|
||||||
|
debug "a peer is selected from peer manager"
|
||||||
|
res = await w.queryWithPaging(req, peerOpt.get())
|
||||||
|
|
||||||
|
if res.isErr():
|
||||||
|
debug "failed to resume the history"
|
||||||
|
return err("failed to resume the history")
|
||||||
|
|
||||||
|
|
||||||
|
# Save the retrieved messages in the store
|
||||||
|
var added: uint = 0
|
||||||
|
for msg in res.get():
|
||||||
|
let putStoreRes = w.store.put(pubsubTopic, msg)
|
||||||
|
if putStoreRes.isErr():
|
||||||
|
continue
|
||||||
|
|
||||||
|
added.inc()
|
||||||
|
|
||||||
|
return ok(added)
|
||||||
|
|
||||||
|
|
||||||
|
## EXPERIMENTAL
|
||||||
|
|
||||||
|
# NOTE: Experimental, maybe incorporate as part of query call
|
||||||
|
proc queryWithAccounting*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
||||||
|
if w.wakuSwap.isNil():
|
||||||
|
return err("waku swap not fount (nil)")
|
||||||
|
|
||||||
|
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||||||
|
if peerOpt.isNone():
|
||||||
|
error "no suitable remote peers"
|
||||||
|
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
|
||||||
|
return err(peerNotFoundFailure)
|
||||||
|
|
||||||
|
let queryRes = await w.query(req, peerOpt.get())
|
||||||
|
if queryRes.isErr():
|
||||||
|
return err(queryRes.error)
|
||||||
|
|
||||||
|
let response = queryRes.get()
|
||||||
|
|
||||||
|
# Perform accounting operation. Assumes wakuSwap protocol is mounted
|
||||||
|
w.wakuSwap.debit(peerOpt.get().peerId, response.messages.len)
|
||||||
|
|
||||||
|
return ok(response)
|
|
@ -25,16 +25,10 @@ import
|
||||||
./rpc,
|
./rpc,
|
||||||
./rpc_codec,
|
./rpc_codec,
|
||||||
./pagination,
|
./pagination,
|
||||||
./message_store
|
./message_store,
|
||||||
|
./protocol_metrics
|
||||||
|
|
||||||
|
|
||||||
declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
|
|
||||||
declarePublicGauge waku_store_peers, "number of store peers"
|
|
||||||
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
|
||||||
declarePublicGauge waku_store_queries, "number of store queries received"
|
|
||||||
declarePublicHistogram waku_store_insert_duration_seconds, "message insertion duration"
|
|
||||||
declarePublicHistogram waku_store_query_duration_seconds, "history query duration"
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakustore"
|
topics = "wakustore"
|
||||||
|
|
||||||
|
@ -47,16 +41,6 @@ const
|
||||||
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
|
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
|
||||||
|
|
||||||
|
|
||||||
# Error types (metric label values)
|
|
||||||
const
|
|
||||||
invalidMessage = "invalid_message"
|
|
||||||
insertFailure = "insert_failure"
|
|
||||||
retPolicyFailure = "retpolicy_failure"
|
|
||||||
dialFailure = "dial_failure"
|
|
||||||
decodeRpcFailure = "decode_rpc_failure"
|
|
||||||
peerNotFoundFailure = "peer_not_found_failure"
|
|
||||||
|
|
||||||
|
|
||||||
type
|
type
|
||||||
WakuStoreResult*[T] = Result[T, string]
|
WakuStoreResult*[T] = Result[T, string]
|
||||||
|
|
||||||
|
@ -291,11 +275,13 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
|
||||||
## CLIENT
|
## CLIENT
|
||||||
|
|
||||||
# TODO: This should probably be an add function and append the peer to an array
|
# TODO: This should probably be an add function and append the peer to an array
|
||||||
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
|
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) {.
|
||||||
|
deprecated: "use waku_store/client methods instead".} =
|
||||||
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
ws.peerManager.addPeer(peer, WakuStoreCodec)
|
||||||
waku_store_peers.inc()
|
waku_store_peers.inc()
|
||||||
|
|
||||||
proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
|
||||||
|
deprecated: "use waku_store/client methods instead".} =
|
||||||
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
|
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
|
||||||
if connOpt.isNone():
|
if connOpt.isNone():
|
||||||
waku_store_errors.inc(labelValues = [dialFailure])
|
waku_store_errors.inc(labelValues = [dialFailure])
|
||||||
|
@ -316,7 +302,8 @@ proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuSt
|
||||||
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
|
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
|
||||||
return ok(response.value.response)
|
return ok(response.value.response)
|
||||||
|
|
||||||
proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
|
||||||
|
deprecated: "use waku_store/client methods instead".} =
|
||||||
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
|
||||||
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||||
# This will require us to check for various factors such as:
|
# This will require us to check for various factors such as:
|
||||||
|
@ -337,7 +324,8 @@ proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResp
|
||||||
|
|
||||||
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
|
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
|
||||||
|
|
||||||
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe,
|
||||||
|
deprecated: "use waku_store/client methods instead".} =
|
||||||
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
|
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
|
||||||
## it retrieves the historical messages in pages.
|
## it retrieves the historical messages in pages.
|
||||||
## Returns all the fetched messages, if error occurs, returns an error string
|
## Returns all the fetched messages, if error occurs, returns an error string
|
||||||
|
@ -366,7 +354,8 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf
|
||||||
|
|
||||||
return ok(messageList)
|
return ok(messageList)
|
||||||
|
|
||||||
proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe,
|
||||||
|
deprecated: "use waku_store/client methods instead".} =
|
||||||
## Loops through the peers candidate list in order and sends the query to each
|
## Loops through the peers candidate list in order and sends the query to each
|
||||||
##
|
##
|
||||||
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
|
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
|
||||||
|
@ -394,7 +383,8 @@ proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInf
|
||||||
proc resume*(w: WakuStore,
|
proc resume*(w: WakuStore,
|
||||||
peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]),
|
peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]),
|
||||||
pageSize: uint64 = DefaultPageSize,
|
pageSize: uint64 = DefaultPageSize,
|
||||||
pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe,
|
||||||
|
deprecated: "use waku_store/client methods instead".} =
|
||||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||||
## messages are stored in the store node's messages field and in the message db
|
## messages are stored in the store node's messages field and in the message db
|
||||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||||
|
@ -472,7 +462,8 @@ proc resume*(w: WakuStore,
|
||||||
## EXPERIMENTAL
|
## EXPERIMENTAL
|
||||||
|
|
||||||
# NOTE: Experimental, maybe incorporate as part of query call
|
# NOTE: Experimental, maybe incorporate as part of query call
|
||||||
proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
|
proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
|
||||||
|
deprecated: "use waku_store/client methods instead".} =
|
||||||
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
|
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
|
||||||
if peerOpt.isNone():
|
if peerOpt.isNone():
|
||||||
error "no suitable remote peers"
|
error "no suitable remote peers"
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import metrics
|
||||||
|
|
||||||
|
|
||||||
|
declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
|
||||||
|
declarePublicGauge waku_store_peers, "number of store peers"
|
||||||
|
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
||||||
|
declarePublicGauge waku_store_queries, "number of store queries received"
|
||||||
|
declarePublicHistogram waku_store_insert_duration_seconds, "message insertion duration"
|
||||||
|
declarePublicHistogram waku_store_query_duration_seconds, "history query duration"
|
||||||
|
|
||||||
|
|
||||||
|
# Error types (metric label values)
|
||||||
|
const
|
||||||
|
invalidMessage* = "invalid_message"
|
||||||
|
insertFailure* = "insert_failure"
|
||||||
|
retPolicyFailure* = "retpolicy_failure"
|
||||||
|
dialFailure* = "dial_failure"
|
||||||
|
decodeRpcFailure* = "decode_rpc_failure"
|
||||||
|
peerNotFoundFailure* = "peer_not_found_failure"
|
Loading…
Reference in New Issue