mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-04 19:14:47 +00:00
fix(store): fix waku store resume tests
This commit is contained in:
parent
fbd9490df0
commit
2c975597e1
@ -13,6 +13,7 @@ import
|
||||
./v2/test_message_store_sqlite,
|
||||
./v2/test_waku_store_rpc_codec,
|
||||
./v2/test_waku_store,
|
||||
./v2/test_waku_store_resume,
|
||||
./v2/test_wakunode_store,
|
||||
# Waku Filter
|
||||
./v2/test_waku_filter,
|
||||
|
@ -49,11 +49,11 @@ proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
|
||||
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
|
||||
return newStandardSwitch(some(peerKey), addrs=peerAddr)
|
||||
|
||||
proc newTestStore(): MessageStore =
|
||||
proc newTestMessageStore(): MessageStore =
|
||||
let database = newTestDatabase()
|
||||
SqliteStore.init(database).tryGet()
|
||||
|
||||
proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore =
|
||||
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): WakuStore =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
rng = crypto.newRng()
|
||||
@ -64,8 +64,29 @@ proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore =
|
||||
|
||||
return proto
|
||||
|
||||
suite "Waku Store - history query":
|
||||
|
||||
procSuite "Waku Store - history query":
|
||||
## Fixtures
|
||||
let storeA = block:
|
||||
let store = newTestMessageStore()
|
||||
|
||||
let msgList = @[
|
||||
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(0)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(1)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(2)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(3)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(4)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(5)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(6)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(7)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(8)),
|
||||
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(9))
|
||||
]
|
||||
|
||||
for msg in msgList:
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
store
|
||||
|
||||
asyncTest "handle query":
|
||||
## Setup
|
||||
let
|
||||
@ -449,6 +470,113 @@ suite "Waku Store - history query":
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "handle temporal history query with a valid time window":
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
let
|
||||
server = newTestWakuStore(serverSwitch, store=storeA)
|
||||
client = newTestWakuStore(clientSwitch)
|
||||
|
||||
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
## Given
|
||||
let rpc = HistoryQuery(
|
||||
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
||||
startTime: Timestamp(2),
|
||||
endTime: Timestamp(5)
|
||||
)
|
||||
|
||||
## When
|
||||
let res = await client.query(rpc)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response.messages.len() == 2
|
||||
response.messages.anyIt(it.timestamp == Timestamp(3))
|
||||
response.messages.anyIt(it.timestamp == Timestamp(5))
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "handle temporal history query with a zero-size time window":
|
||||
# a zero-size window results in an empty list of history messages
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
let
|
||||
server = newTestWakuStore(serverSwitch, store=storeA)
|
||||
client = newTestWakuStore(clientSwitch)
|
||||
|
||||
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
## Given
|
||||
let rpc = HistoryQuery(
|
||||
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
||||
startTime: Timestamp(2),
|
||||
endTime: Timestamp(2)
|
||||
)
|
||||
|
||||
## When
|
||||
let res = await client.query(rpc)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response.messages.len == 0
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "handle temporal history query with an invalid time window":
|
||||
# A history query with an invalid time range results in an empty list of history messages
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
let
|
||||
server = newTestWakuStore(serverSwitch, store=storeA)
|
||||
client = newTestWakuStore(clientSwitch)
|
||||
|
||||
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
## Given
|
||||
let rpc = HistoryQuery(
|
||||
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
||||
startTime: Timestamp(5),
|
||||
endTime: Timestamp(2)
|
||||
)
|
||||
|
||||
## When
|
||||
let res = await client.query(rpc)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response.messages.len == 0
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
|
||||
suite "Waku Store - message handling":
|
||||
|
||||
asyncTest "it should store a valid and non-ephemeral message":
|
||||
@ -562,209 +690,3 @@ suite "Waku Store - message handling":
|
||||
|
||||
## Cleanup
|
||||
await switch.stop()
|
||||
|
||||
|
||||
# TODO: Review this test suite test cases
|
||||
procSuite "Waku Store - fault tolerant store":
|
||||
|
||||
proc newTestWakuStore(peer=none(RemotePeerInfo)): Future[(Switch, Switch, WakuStore)] {.async.} =
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
listenSwitch = newStandardSwitch(some(key))
|
||||
await listenSwitch.start()
|
||||
|
||||
let dialSwitch = newStandardSwitch()
|
||||
await dialSwitch.start()
|
||||
|
||||
let
|
||||
peerManager = PeerManager.new(dialsWitch)
|
||||
rng = crypto.newRng()
|
||||
database = newTestDatabase()
|
||||
store = SqliteStore.init(database).tryGet()
|
||||
proto = WakuStore.init(peerManager, rng, store)
|
||||
|
||||
let storePeer = peer.get(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||
proto.setPeer(storePeer)
|
||||
|
||||
await proto.start()
|
||||
listenSwitch.mount(proto)
|
||||
|
||||
return (listenSwitch, dialSwitch, proto)
|
||||
|
||||
|
||||
asyncTest "temporal history queries":
|
||||
## Setup
|
||||
let (listenSwitch, dialSwitch, proto) = await newTestWakuStore()
|
||||
let msgList = @[
|
||||
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
|
||||
WakuMessage(payload: @[byte 1], contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
|
||||
WakuMessage(payload: @[byte 2], contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
|
||||
WakuMessage(payload: @[byte 6], contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
|
||||
WakuMessage(payload: @[byte 7], contentTopic: ContentTopic("1"), timestamp: Timestamp(7)),
|
||||
WakuMessage(payload: @[byte 8], contentTopic: ContentTopic("2"), timestamp: Timestamp(8)),
|
||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("1"), timestamp: Timestamp(9))
|
||||
]
|
||||
|
||||
for msg in msgList:
|
||||
require proto.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore()
|
||||
let msgList2 = @[
|
||||
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
|
||||
WakuMessage(payload: @[byte 11], contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
|
||||
WakuMessage(payload: @[byte 12], contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
|
||||
WakuMessage(payload: @[byte 3], contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
|
||||
WakuMessage(payload: @[byte 4], contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
|
||||
WakuMessage(payload: @[byte 5], contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
|
||||
WakuMessage(payload: @[byte 13], contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
|
||||
WakuMessage(payload: @[byte 14], contentTopic: ContentTopic("1"), timestamp: Timestamp(7))
|
||||
]
|
||||
|
||||
for msg in msgList2:
|
||||
require proto2.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
|
||||
asyncTest "handle temporal history query with a valid time window":
|
||||
## Given
|
||||
let rpc = HistoryQuery(
|
||||
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
||||
startTime: Timestamp(2),
|
||||
endTime: Timestamp(5)
|
||||
)
|
||||
|
||||
## When
|
||||
let res = await proto.query(rpc)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response.messages.len() == 2
|
||||
response.messages.anyIt(it.timestamp == Timestamp(3))
|
||||
response.messages.anyIt(it.timestamp == Timestamp(5))
|
||||
|
||||
asyncTest "handle temporal history query with a zero-size time window":
|
||||
# a zero-size window results in an empty list of history messages
|
||||
## Given
|
||||
let rpc = HistoryQuery(
|
||||
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
||||
startTime: Timestamp(2),
|
||||
endTime: Timestamp(2)
|
||||
)
|
||||
|
||||
## When
|
||||
let res = await proto.query(rpc)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response.messages.len == 0
|
||||
|
||||
asyncTest "handle temporal history query with an invalid time window":
|
||||
# A history query with an invalid time range results in an empty list of history messages
|
||||
## Given
|
||||
let rpc = HistoryQuery(
|
||||
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
||||
startTime: Timestamp(5),
|
||||
endTime: Timestamp(2)
|
||||
)
|
||||
|
||||
## When
|
||||
let res = await proto.query(rpc)
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response.messages.len == 0
|
||||
|
||||
asyncTest "resume message history":
|
||||
## Given
|
||||
# Start a new node
|
||||
let (listenSwitch3, dialSwitch3, proto3) = await newTestWakuStore(peer=some(listenSwitch.peerInfo.toRemotePeerInfo()))
|
||||
|
||||
## When
|
||||
let successResult = await proto3.resume()
|
||||
|
||||
## Then
|
||||
check:
|
||||
successResult.isOk()
|
||||
successResult.value == 10
|
||||
proto3.store.getMessagesCount().tryGet() == 10
|
||||
|
||||
## Cleanup
|
||||
await allFutures(dialSwitch3.stop(), listenSwitch3.stop())
|
||||
|
||||
asyncTest "queryFromWithPaging - no pagingInfo":
|
||||
## Given
|
||||
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
|
||||
|
||||
## When
|
||||
let res = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response.len == 4
|
||||
|
||||
asyncTest "queryFromWithPaging - with pagination":
|
||||
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
|
||||
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo)
|
||||
|
||||
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
check:
|
||||
messagesResult.isOk
|
||||
messagesResult.value.len == 4
|
||||
|
||||
asyncTest "resume history from a list of offline peers":
|
||||
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
|
||||
var dialSwitch3 = newStandardSwitch()
|
||||
await dialSwitch3.start()
|
||||
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
|
||||
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo()]))
|
||||
check:
|
||||
successResult.isErr
|
||||
|
||||
#free resources
|
||||
await allFutures(dialSwitch3.stop(),
|
||||
offListenSwitch.stop())
|
||||
|
||||
asyncTest "resume history from a list of candidate peers":
|
||||
|
||||
let offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
|
||||
let (listenSwitch3, dialSwitch3, proto3) = await newTestWakuStore()
|
||||
|
||||
## When
|
||||
let res = await proto3.resume(some(@[
|
||||
offListenSwitch.peerInfo.toRemotePeerInfo(),
|
||||
listenSwitch.peerInfo.toRemotePeerInfo(),
|
||||
listenSwitch2.peerInfo.toRemotePeerInfo()
|
||||
]))
|
||||
|
||||
## Then
|
||||
# `proto3` is expected to retrieve 14 messages because:
|
||||
# - the store mounted on `listenSwitch` holds 10 messages (`msgList`)
|
||||
# - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`)
|
||||
# - both stores share 3 messages, resulting in 14 unique messages in total
|
||||
check res.isOk()
|
||||
|
||||
let response = res.tryGet()
|
||||
check:
|
||||
response == 14
|
||||
proto3.store.getMessagesCount().tryGet() == 14
|
||||
|
||||
## Cleanup
|
||||
await allFutures(listenSwitch3.stop(), dialSwitch3.stop(), offListenSwitch.stop())
|
||||
|
||||
## Cleanup
|
||||
await allFutures(dialSwitch.stop(), dialSwitch2.stop(), listenSwitch.stop(), listenSwitch2.stop())
|
||||
|
203
tests/v2/test_waku_store_resume.nim
Normal file
203
tests/v2/test_waku_store_resume.nim
Normal file
@ -0,0 +1,203 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sets, times],
|
||||
stew/byteutils,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
chronicles,
|
||||
libp2p/switch,
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/node/storage/sqlite,
|
||||
../../waku/v2/node/storage/message/sqlite_store,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/utils/time,
|
||||
../test_helpers
|
||||
|
||||
|
||||
const
|
||||
DefaultPubsubTopic = "/waku/2/default-waku/proto"
|
||||
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
|
||||
proc now(): Timestamp =
|
||||
getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
proc ts(offset=0, origin=now()): Timestamp =
|
||||
origin + getNanosecondTime(offset)
|
||||
|
||||
proc newTestDatabase(): SqliteDatabase =
|
||||
SqliteDatabase.init("", inMemory = true).tryGet()
|
||||
|
||||
proc fakeWakuMessage(
|
||||
payload = toBytes("TEST-PAYLOAD"),
|
||||
contentTopic = DefaultContentTopic,
|
||||
ts = now(),
|
||||
ephemeral = false,
|
||||
): WakuMessage =
|
||||
WakuMessage(
|
||||
payload: payload,
|
||||
contentTopic: contentTopic,
|
||||
version: 1,
|
||||
timestamp: ts,
|
||||
ephemeral: ephemeral,
|
||||
)
|
||||
|
||||
proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
|
||||
let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get())
|
||||
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
|
||||
return newStandardSwitch(some(peerKey), addrs=peerAddr)
|
||||
|
||||
proc newTestMessageStore(): MessageStore =
|
||||
let database = newTestDatabase()
|
||||
SqliteStore.init(database).tryGet()
|
||||
|
||||
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): WakuStore =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
rng = crypto.newRng()
|
||||
proto = WakuStore.init(peerManager, rng, store)
|
||||
|
||||
waitFor proto.start()
|
||||
switch.mount(proto)
|
||||
|
||||
return proto
|
||||
|
||||
|
||||
procSuite "Waku Store - resume store":
|
||||
## Fixtures
|
||||
let storeA = block:
|
||||
let store = newTestMessageStore()
|
||||
|
||||
let msgList = @[
|
||||
fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)),
|
||||
fakeWakuMessage(payload= @[byte 1], contentTopic=ContentTopic("1"), ts=ts(1)),
|
||||
fakeWakuMessage(payload= @[byte 2], contentTopic=ContentTopic("2"), ts=ts(2)),
|
||||
fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)),
|
||||
fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)),
|
||||
fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)),
|
||||
fakeWakuMessage(payload= @[byte 6], contentTopic=ContentTopic("2"), ts=ts(6)),
|
||||
fakeWakuMessage(payload= @[byte 7], contentTopic=ContentTopic("1"), ts=ts(7)),
|
||||
fakeWakuMessage(payload= @[byte 8], contentTopic=ContentTopic("2"), ts=ts(8)),
|
||||
fakeWakuMessage(payload= @[byte 9], contentTopic=ContentTopic("1"), ts=ts(9))
|
||||
]
|
||||
|
||||
for msg in msgList:
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
store
|
||||
|
||||
let storeB = block:
|
||||
let store = newTestMessageStore()
|
||||
let msgList2 = @[
|
||||
fakeWakuMessage(payload= @[byte 0], contentTopic=ContentTopic("2"), ts=ts(0)),
|
||||
fakeWakuMessage(payload= @[byte 11], contentTopic=ContentTopic("1"), ts=ts(1)),
|
||||
fakeWakuMessage(payload= @[byte 12], contentTopic=ContentTopic("2"), ts=ts(2)),
|
||||
fakeWakuMessage(payload= @[byte 3], contentTopic=ContentTopic("1"), ts=ts(3)),
|
||||
fakeWakuMessage(payload= @[byte 4], contentTopic=ContentTopic("2"), ts=ts(4)),
|
||||
fakeWakuMessage(payload= @[byte 5], contentTopic=ContentTopic("1"), ts=ts(5)),
|
||||
fakeWakuMessage(payload= @[byte 13], contentTopic=ContentTopic("2"), ts=ts(6)),
|
||||
fakeWakuMessage(payload= @[byte 14], contentTopic=ContentTopic("1"), ts=ts(7))
|
||||
]
|
||||
|
||||
for msg in msgList2:
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
store
|
||||
|
||||
|
||||
asyncTest "resume message history":
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
let
|
||||
_ = newTestWakuStore(serverSwitch, store=storeA)
|
||||
client = newTestWakuStore(clientSwitch)
|
||||
|
||||
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
## When
|
||||
let res = await client.resume()
|
||||
|
||||
## Then
|
||||
check res.isOk()
|
||||
|
||||
let resumedMessagesCount = res.tryGet()
|
||||
let storedMessagesCount = client.store.getMessagesCount().tryGet()
|
||||
check:
|
||||
resumedMessagesCount == 10
|
||||
storedMessagesCount == 10
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "resume history from a list of candidates - offline peer":
|
||||
## Setup
|
||||
let
|
||||
clientSwitch = newTestSwitch()
|
||||
offlineSwitch = newTestSwitch()
|
||||
|
||||
await clientSwitch.start()
|
||||
|
||||
let client = newTestWakuStore(clientSwitch)
|
||||
|
||||
## Given
|
||||
let peers = @[offlineSwitch.peerInfo.toRemotePeerInfo()]
|
||||
|
||||
## When
|
||||
let res = await client.resume(some(peers))
|
||||
|
||||
## Then
|
||||
check res.isErr()
|
||||
|
||||
## Cleanup
|
||||
await clientSwitch.stop()
|
||||
|
||||
asyncTest "resume history from a list of candidates - online and offline peers":
|
||||
## Setup
|
||||
let
|
||||
offlineSwitch = newTestSwitch()
|
||||
serverASwitch = newTestSwitch()
|
||||
serverBSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverASwitch.start(), serverBSwitch.start(), clientSwitch.start())
|
||||
|
||||
let
|
||||
serverA = newTestWakuStore(serverASwitch, store=storeA)
|
||||
serverB = newTestWakuStore(serverBSwitch, store=storeB)
|
||||
client = newTestWakuStore(clientSwitch)
|
||||
|
||||
## Given
|
||||
let peers = @[
|
||||
offlineSwitch.peerInfo.toRemotePeerInfo(),
|
||||
serverASwitch.peerInfo.toRemotePeerInfo(),
|
||||
serverBSwitch.peerInfo.toRemotePeerInfo()
|
||||
]
|
||||
|
||||
## When
|
||||
let res = await client.resume(some(peers))
|
||||
|
||||
## Then
|
||||
# `client` is expected to retrieve 14 messages:
|
||||
# - The store mounted on `serverB` holds 10 messages (see `storeA` fixture)
|
||||
# - The store mounted on `serverB` holds 7 messages (see `storeB` fixture)
|
||||
# Both stores share 3 messages, resulting in 14 unique messages in total
|
||||
check res.isOk()
|
||||
|
||||
let restoredMessagesCount = res.tryGet()
|
||||
let storedMessagesCount = client.store.getMessagesCount().tryGet()
|
||||
check:
|
||||
restoredMessagesCount == 14
|
||||
storedMessagesCount == 14
|
||||
|
||||
## Cleanup
|
||||
await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop())
|
||||
|
@ -26,7 +26,8 @@ method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest:
|
||||
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
||||
let
|
||||
digest = computeDigest(message)
|
||||
receivedTime = getNanosecondTime(getTime().toUnixFloat())
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
ms.put(pubsubTopic, message, digest, receivedTime)
|
||||
|
||||
|
@ -459,6 +459,7 @@ proc resume*(w: WakuStore,
|
||||
for msg in res.get():
|
||||
let putStoreRes = w.store.put(pubsubTopic, msg)
|
||||
if putStoreRes.isErr():
|
||||
warn "failed to insert resumed message into store", error=putStoreRes.error
|
||||
continue
|
||||
|
||||
added.inc()
|
||||
|
Loading…
x
Reference in New Issue
Block a user