refactor(node): use store client in waku_node

This commit is contained in:
Lorenzo Delgado 2022-10-28 20:11:28 +02:00 committed by GitHub
parent ed9b2b59b2
commit ee86d190d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 237 additions and 518 deletions

View File

@ -474,7 +474,8 @@ proc processInput(rfd: AsyncFD) {.async.} =
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & $(storenode.get())
node.wakuStore.setPeer(storenode.get())
node.mountStoreClient()
node.setStorePeer(storenode.get())
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:

View File

@ -393,11 +393,15 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
executeMessageRetentionPolicy(node)
startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
if conf.storenode != "":
try:
setStorePeer(node, conf.storenode)
except:
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
if conf.storenode != "":
try:
# TODO: Use option instead of nil in store client
let mStorage = if mStore.isNone(): nil
else: mStore.get()
mountStoreClient(node, store=mStorage)
setStorePeer(node, conf.storenode)
except:
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
# NOTE Must be mounted after relay
if conf.lightpush:

View File

@ -28,17 +28,14 @@ import
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter,
../../waku/v2/utils/peers,
../../waku/v2/utils/time
../../waku/v2/utils/time,
./testlib/common
template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
const sigPath = sourceDir / ParDir / ParDir / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim"
createRpcSigs(RpcHttpClient, sigPath)
procSuite "Waku v2 JSON-RPC API":
const defaultTopic = "/waku/2/default-waku/proto"
const defaultContentTopic = ContentTopic("/waku/2/default-content/proto")
const testCodec = "/waku/2/default-waku/codec"
let
rng = crypto.newRng()
privkey = crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
@ -48,7 +45,7 @@ procSuite "Waku v2 JSON-RPC API":
node = WakuNode.new(privkey, bindIp, port, some(extIp), some(port))
asyncTest "Debug API: get node info":
waitFor node.start()
await node.start()
await node.mountRelay()
@ -72,10 +69,10 @@ procSuite "Waku v2 JSON-RPC API":
await server.stop()
await server.closeWait()
waitfor node.stop()
await node.stop()
asyncTest "Relay API: publish and subscribe/unsubscribe":
waitFor node.start()
await node.start()
await node.mountRelay()
@ -105,7 +102,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true
# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
response = await client.post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
check:
# @TODO poll topic to verify message has been published
@ -122,18 +119,18 @@ procSuite "Waku v2 JSON-RPC API":
await server.stop()
await server.closeWait()
waitfor node.stop()
await node.stop()
asyncTest "Relay API: get latest messages":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(60000))
node1 = WakuNode.new(nodeKey1, bindIp, Port(60300))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(60002))
node2 = WakuNode.new(nodeKey2, bindIp, Port(60302))
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
node3 = WakuNode.new(nodeKey3, bindIp, Port(60303), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = defaultContentTopic
contentTopic = DefaultContentTopic
payload1 = @[byte 9]
message1 = WakuMessage(payload: payload1, contentTopic: contentTopic)
payload2 = @[byte 8]
@ -165,11 +162,11 @@ procSuite "Waku v2 JSON-RPC API":
await client.connect("127.0.0.1", rpcPort, false)
# First see if we can retrieve messages published on the default topic (node is already subscribed)
await node2.publish(defaultTopic, message1)
await node2.publish(DefaultPubsubTopic, message1)
await sleepAsync(2000.millis)
var messages = await client.get_waku_v2_relay_v1_messages(defaultTopic)
var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic)
check:
messages.len == 1
@ -215,8 +212,8 @@ procSuite "Waku v2 JSON-RPC API":
await node2.stop()
await node3.stop()
asyncTest "Store API: retrieve historical messages":
waitFor node.start()
asyncTest "Store API: retrieve historical messages":
await node.start()
await node.mountRelay()
@ -234,12 +231,14 @@ procSuite "Waku v2 JSON-RPC API":
key = crypto.PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
await node.mountStore(store=StoreQueueRef.new())
let store = StoreQueueRef.new()
await node.mountStore(store=store)
node.mountStoreClient(store=store)
var listenSwitch = newStandardSwitch(some(key))
waitFor listenSwitch.start()
await listenSwitch.start()
node.wakuStore.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
node.setStorePeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(node.wakuRelay)
listenSwitch.mount(node.wakuStore)
@ -247,23 +246,23 @@ procSuite "Waku v2 JSON-RPC API":
# Now prime it with some history before tests
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: 0),
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic, timestamp: 1),
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic, timestamp: 2),
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic, timestamp: 3),
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic, timestamp: 4),
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic, timestamp: 5),
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic, timestamp: 6),
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic, timestamp: 7),
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic, timestamp: 8),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: 1),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: 2),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: 3),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: 4),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: 5),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: 6),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: 7),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: 8),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)]
for wakuMsg in msgList:
require node.wakuStore.store.put(defaultTopic, wakuMsg).isOk()
require node.wakuStore.store.put(DefaultPubsubTopic, wakuMsg).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
let response = await client.get_waku_v2_store_v1_messages(some(DefaultPubsubTopic), some(@[HistoryContentFilter(contentTopic: DefaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isNone()
@ -271,10 +270,10 @@ procSuite "Waku v2 JSON-RPC API":
await server.stop()
await server.closeWait()
waitfor node.stop()
await node.stop()
asyncTest "Filter API: subscribe/unsubscribe":
waitFor node.start()
await node.start()
await node.mountRelay()
@ -296,19 +295,19 @@ procSuite "Waku v2 JSON-RPC API":
# Light node has not yet subscribed to any filters
node.filters.len() == 0
let contentFilters = @[ContentFilter(contentTopic: defaultContentTopic),
let contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic),
ContentFilter(contentTopic: ContentTopic("2")),
ContentFilter(contentTopic: ContentTopic("3")),
ContentFilter(contentTopic: ContentTopic("4")),
]
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic))
check:
# Light node has successfully subscribed to a single filter
node.filters.len() == 1
response == true
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic))
check:
# Light node has successfully unsubscribed from all filters
@ -318,10 +317,10 @@ procSuite "Waku v2 JSON-RPC API":
await server.stop()
await server.closeWait()
waitfor node.stop()
await node.stop()
asyncTest "Filter API: get latest messages":
waitFor node.start()
await node.start()
# RPC server setup
let
@ -339,21 +338,21 @@ procSuite "Waku v2 JSON-RPC API":
# First ensure subscription exists
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: defaultContentTopic)], topic = some(defaultTopic))
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic)], topic = some(DefaultPubsubTopic))
check:
sub
# Now prime the node with some messages before tests
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
let
@ -363,13 +362,13 @@ procSuite "Waku v2 JSON-RPC API":
for wakuMsg in msgList:
filters.notify(wakuMsg, requestId)
var response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
var response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic)
check:
response.len() == 8
response.allIt(it.contentTopic == defaultContentTopic)
response.allIt(it.contentTopic == DefaultContentTopic)
# No new messages
response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic)
check:
response.len() == 0
@ -380,15 +379,15 @@ procSuite "Waku v2 JSON-RPC API":
for x in 1..(maxSize + 1):
# Try to cache 1 more than maximum allowed
filters.notify(WakuMessage(payload: @[byte x], contentTopic: defaultContentTopic), requestId)
filters.notify(WakuMessage(payload: @[byte x], contentTopic: DefaultContentTopic), requestId)
await sleepAsync(2000.millis)
response = await client.get_waku_v2_filter_v1_messages(defaultContentTopic)
response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic)
check:
# Max messages has not been exceeded
response.len == maxSize
response.allIt(it.contentTopic == defaultContentTopic)
response.allIt(it.contentTopic == DefaultContentTopic)
# Check that oldest item has been removed
response[0].payload == @[byte 2]
response[maxSize - 1].payload == @[byte (maxSize + 1)]
@ -396,21 +395,18 @@ procSuite "Waku v2 JSON-RPC API":
await server.stop()
await server.closeWait()
waitfor node.stop()
await node.stop()
asyncTest "Admin API: connect to ad-hoc peers":
# Create a couple of nodes
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60600))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60602))
peerInfo2 = node2.switch.peerInfo
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60004))
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60604))
peerInfo3 = node3.switch.peerInfo
await allFutures([node1.start(), node2.start(), node3.start()])
@ -459,15 +455,12 @@ procSuite "Waku v2 JSON-RPC API":
# Create a couple of nodes
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60220))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60222))
peerInfo2 = node2.switch.peerInfo
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"),
Port(60004))
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60224))
peerInfo3 = node3.switch.peerInfo
await allFutures([node1.start(), node2.start(), node3.start()])
@ -511,10 +504,9 @@ procSuite "Waku v2 JSON-RPC API":
asyncTest "Admin API: get unmanaged peer information":
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"),
Port(60000))
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60523))
waitFor node.start()
await node.start()
# RPC server setup
let
@ -530,7 +522,9 @@ procSuite "Waku v2 JSON-RPC API":
await node.mountFilter()
await node.mountSwap()
await node.mountStore(store=StoreQueueRef.new())
let store = StoreQueueRef.new()
await node.mountStore(store=store)
node.mountStoreClient(store=store)
# Create and set some peers
let
@ -547,7 +541,8 @@ procSuite "Waku v2 JSON-RPC API":
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.wakuStore.setPeer(storePeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo())
let response = await client.get_waku_v2_admin_v1_peers()
@ -563,7 +558,7 @@ procSuite "Waku v2 JSON-RPC API":
await server.stop()
await server.closeWait()
waitfor node.stop()
await node.stop()
asyncTest "Private API: generate asymmetric keys and encrypt/decrypt communication":
let
@ -574,7 +569,7 @@ procSuite "Waku v2 JSON-RPC API":
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = defaultContentTopic
contentTopic = DefaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()
@ -665,7 +660,7 @@ procSuite "Waku v2 JSON-RPC API":
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), some(extIp), some(port))
pubSubTopic = "polling"
contentTopic = defaultContentTopic
contentTopic = DefaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()

View File

@ -104,11 +104,12 @@ procSuite "Peer Manager":
await node.mountFilter()
await node.mountSwap()
await node.mountStore()
node.mountStoreClient()
node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo())
node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo())
node.wakuStore.setPeer(storePeer.toRemotePeerInfo())
node.setStorePeer(storePeer.toRemotePeerInfo())
# Check peers were successfully added to peer manager
check:
@ -217,7 +218,7 @@ procSuite "Peer Manager":
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
asyncTest "Peer manager support multiple protocol IDs when reconnecting to peers":
let
database = SqliteDatabase.init("2", inMemory = true)[]
storage = WakuPeerStorage.new(database)[]

View File

@ -31,7 +31,7 @@ proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuS
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
proto = WakuStore.init(peerManager, rng, store)
proto = WakuStore.new(peerManager, rng, store)
await proto.start()
switch.mount(proto)
@ -80,9 +80,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let topic = ContentTopic("1")
let
@ -92,9 +89,11 @@ procSuite "Waku Store - history query":
server.handleMessage("foo", msg1)
server.handleMessage("foo", msg2)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
let resQuery = await client.query(rpc)
let resQuery = await client.query(rpc, peer=serverPeerInfo)
## Then
check:
@ -120,8 +119,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let
topic1 = ContentTopic("1")
@ -137,12 +134,14 @@ procSuite "Waku Store - history query":
server.handleMessage("foo", msg2)
server.handleMessage("foo", msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let rpc = HistoryQuery(contentFilters: @[
HistoryContentFilter(contentTopic: topic1),
HistoryContentFilter(contentTopic: topic3)
])
let resQuery = await client.query(rpc)
let resQuery = await client.query(rpc, peer=serverPeerInfo)
## Then
check:
@ -169,8 +168,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let
pubsubTopic1 = "queried-topic"
@ -190,6 +187,8 @@ procSuite "Waku Store - history query":
server.handleMessage(pubsubtopic2, msg2)
server.handleMessage(pubsubtopic2, msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
let rpc = HistoryQuery(
@ -197,7 +196,7 @@ procSuite "Waku Store - history query":
HistoryContentFilter(contentTopic: contentTopic3)],
pubsubTopic: pubsubTopic1
)
let resQuery = await client.query(rpc)
let resQuery = await client.query(rpc, peer=serverPeerInfo)
## Then
check:
@ -223,8 +222,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let
pubsubtopic1 = "queried-topic"
@ -239,9 +236,11 @@ procSuite "Waku Store - history query":
server.handleMessage(pubsubtopic2, msg2)
server.handleMessage(pubsubtopic2, msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
let res = await client.query(rpc)
let res = await client.query(rpc, peer=serverPeerInfo)
## Then
check:
@ -266,8 +265,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let pubsubTopic = "queried-topic"
@ -280,9 +277,11 @@ procSuite "Waku Store - history query":
server.handleMessage(pubsubTopic, msg2)
server.handleMessage(pubsubTopic, msg3)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let rpc = HistoryQuery(pubsubTopic: pubsubTopic)
let res = await client.query(rpc)
let res = await client.query(rpc, peer=serverPeerInfo)
## Then
check:
@ -310,8 +309,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let currentTime = now()
let msgList = @[
@ -329,13 +326,15 @@ procSuite "Waku Store - history query":
for msg in msgList:
require server.store.put(DefaultPubsubTopic, msg).isOk()
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
var rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
)
var res = await client.query(rpc)
var res = await client.query(rpc, peer=serverPeerInfo)
require res.isOk()
var
@ -353,7 +352,7 @@ procSuite "Waku Store - history query":
rpc.pagingInfo = response.pagingInfo
# Continue querying
res = await client.query(rpc)
res = await client.query(rpc, peer=serverPeerInfo)
require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len()
@ -379,8 +378,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let currentTime = now()
let msgList = @[
@ -399,12 +396,14 @@ procSuite "Waku Store - history query":
for msg in msgList:
require server.store.put(DefaultPubsubTopic, msg).isOk()
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
var rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
)
var res = await client.query(rpc)
var res = await client.query(rpc, peer=serverPeerInfo)
require res.isOk()
var
@ -422,7 +421,7 @@ procSuite "Waku Store - history query":
rpc.pagingInfo = response.pagingInfo
# Continue querying
res = await client.query(rpc)
res = await client.query(rpc, peer=serverPeerInfo)
require res.isOk()
response = res.tryGet()
totalMessages += response.messages.len()
@ -448,8 +447,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let msgList = @[
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
@ -467,9 +464,11 @@ procSuite "Waku Store - history query":
for msg in msgList:
require server.store.put(DefaultPubsubTopic, msg).isOk()
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
let res = await client.query(rpc)
let res = await client.query(rpc, peer=serverPeerInfo)
## Then
check:
@ -497,8 +496,6 @@ procSuite "Waku Store - history query":
server = newTestWakuStore(serverSwitch, store=storeA)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
@ -506,8 +503,10 @@ procSuite "Waku Store - history query":
endTime: Timestamp(5)
)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let res = await client.query(rpc)
let res = await client.query(rpc, peer=serverPeerInfo)
## Then
check res.isOk()
@ -534,8 +533,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch, store=storeA)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
@ -543,8 +540,10 @@ procSuite "Waku Store - history query":
endTime: Timestamp(2)
)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let res = await client.query(rpc)
let res = await client.query(rpc, peer=serverPeerInfo)
## Then
check res.isOk()
@ -569,8 +568,6 @@ procSuite "Waku Store - history query":
server = await newTestWakuStore(serverSwitch, store=storeA)
client = newTestWakuStoreClient(clientSwitch)
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## Given
let rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
@ -578,8 +575,10 @@ procSuite "Waku Store - history query":
endTime: Timestamp(2)
)
let serverPeerInfo =serverSwitch.peerInfo.toRemotePeerInfo()
## When
let res = await client.query(rpc)
let res = await client.query(rpc, peer=serverPeerInfo)
## Then
check res.isOk()
@ -592,7 +591,7 @@ procSuite "Waku Store - history query":
await allFutures(clientSwitch.stop(), serverSwitch.stop())
suite "Waku Store - message handling":
suite "Message Store - message handling":
asyncTest "it should store a valid and non-ephemeral message":
## Setup

View File

@ -25,11 +25,11 @@ proc newTestStore(): MessageStore =
let database = newTestDatabase()
SqliteStore.init(database).tryGet()
proc newTestWakuStore(switch: Switch, store=newTestStore()): Future[WakuStore] {.async.} =
proc newTestWakuStoreNode(switch: Switch, store=newTestStore()): Future[WakuStore] {.async.} =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
proto = WakuStore.init(peerManager, rng, store)
proto = WakuStore.new(peerManager, rng, store)
await proto.start()
switch.mount(proto)
@ -78,7 +78,7 @@ procSuite "Waku Store Client":
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch, store=testStore)
server = await newTestWakuStoreNode(serverSwitch, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
@ -114,7 +114,7 @@ procSuite "Waku Store Client":
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch, store=testStore)
server = await newTestWakuStoreNode(serverSwitch, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
@ -148,8 +148,8 @@ procSuite "Waku Store Client":
await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start())
let
serverA = await newTestWakuStore(serverSwitchA, store=testStore)
serverB = await newTestWakuStore(serverSwitchB, store=testStore)
serverA = await newTestWakuStoreNode(serverSwitchA, store=testStore)
serverB = await newTestWakuStoreNode(serverSwitchB, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
@ -175,70 +175,3 @@ procSuite "Waku Store Client":
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitchA.stop(), serverSwitchB.stop())
asyncTest "single query with no pre-configured store peer should fail":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
let rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 8)
)
## When
let res = await client.query(rpc)
## Then
check:
res.isErr()
res.error == peerNotFoundFailure
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "single query to pre-configured store peer":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
let
server = await newTestWakuStore(serverSwitch, store=testStore)
client = newTestWakuStoreClient(clientSwitch)
## Given
let peer = serverSwitch.peerInfo.toRemotePeerInfo()
let rpc = HistoryQuery(
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
pagingInfo: PagingInfo(pageSize: 8)
)
## When
client.setPeer(peer)
let res = await client.query(rpc)
## Then
check:
res.isOk()
let response = res.tryGet()
check:
## No pagination specified. Response will be auto-paginated with
## up to MaxPageSize messages per page.
response.messages.len() == 8
response.pagingInfo != PagingInfo()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, tables, sets, times],
std/[options, tables, sets],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
@ -20,7 +20,6 @@ import
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/waku_node,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../test_helpers,
./utils,
./testlib/common
@ -53,35 +52,40 @@ procSuite "Waku SWAP Accounting":
decodedCheque.isErr == false
decodedCheque.get() == cheque
# TODO To do this reliably we need access to contract node
# TODO: To do this reliably we need access to contract node
# With current logic state isn't updated because of bad cheque
# Consider moving this test to e2e test, and/or move swap module to be on by default
asyncTest "Update accounting state after store operations":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60102))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60100))
# Start nodes and mount protocols
await allFutures(client.start(), server.start())
await server.mountSwap()
await server.mountStore(store=StoreQueueRef.new())
await client.mountSwap()
await client.mountStore()
client.mountStoreClient()
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
let serverPeer = server.peerInfo.toRemotePeerInfo()
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
## When
let queryRes = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
let queryRes = await client.query(rpc, peer=serverPeer)
## Then
check queryRes.isOk()
@ -104,9 +108,9 @@ procSuite "Waku SWAP Accounting":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002))
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60202))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60200))
# Define the waku swap Config for this test
let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1)
@ -117,26 +121,31 @@ procSuite "Waku SWAP Accounting":
await server.mountStore(store=StoreQueueRef.new())
await client.mountSwap(swapConfig)
await client.mountStore()
client.mountStoreClient()
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
## When
# TODO: Handshakes - for now we assume implicit, e2e still works for PoC
let res1 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
let res2 = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
let res1 = await client.query(rpc, peer=serverPeer)
let res2 = await client.query(rpc, peer=serverPeer)
## Then
check:
require:
res1.isOk()
res2.isOk()
## Then
check:
# Accounting table updated with credit and debit, respectively
# After sending a cheque the balance is partially adjusted

View File

@ -46,16 +46,17 @@ procSuite "WakuNode - Store":
await allFutures(client.start(), server.start())
await server.mountStore(store=newTestMessageStore())
await client.mountStore()
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
client.mountStoreClient()
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
let queryRes = await client.query(req)
let queryRes = await client.query(req, peer=serverPeer)
## Then
check queryRes.isOk()
@ -84,13 +85,13 @@ procSuite "WakuNode - Store":
await server.mountStore(store=newTestMessageStore())
await server.mountFilter()
await client.mountStore()
client.mountStoreClient()
server.wakuFilter.setPeer(filterSource.peerInfo.toRemotePeerInfo())
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## Then
let filterFut = newFuture[bool]()
proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} =
@ -109,7 +110,7 @@ procSuite "WakuNode - Store":
# Wait for the server filter to receive the push message
require (await filterFut.withTimeout(5.seconds))
let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]))
let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), peer=serverPeer)
## Then
check res.isOk()
@ -134,16 +135,19 @@ procSuite "WakuNode - Store":
await allFutures(client.start(), server.start())
await server.mountStore(store=newTestMessageStore())
await client.mountStore(store=StoreQueueRef.new())
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume()
await client.resume(some(@[serverPeer]))
# Then
check:
@ -162,10 +166,11 @@ procSuite "WakuNode - Store":
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000))
await allFutures(server.start(), client.start())
await client.mountStore(store=StoreQueueRef.new())
await server.mountStore(store=StoreQueueRef.new())
client.wakuStore.setPeer(server.peerInfo.toRemotePeerInfo())
let clientStore = StoreQueueRef.new()
await client.mountStore(store=clientStore)
client.mountStoreClient(store=clientStore)
## Given
let timeOrigin = now()
@ -184,8 +189,10 @@ procSuite "WakuNode - Store":
require server.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
require client.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
## When
await client.resume()
await client.resume(some(@[serverPeer]))
## Then
check:

View File

@ -5,6 +5,7 @@ import
chronicles,
json_rpc/rpcserver
import
../peer_manager/peer_manager,
../waku_node,
../../protocol/waku_store,
../../utils/time,
@ -21,24 +22,27 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
## Store API version 1 definitions
rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
rpcsrv.rpc("get_waku_v2_store_v1_messages") do (pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse:
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers")
let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "",
contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[],
startTime: if startTime.isSome: startTime.get() else: Timestamp(0),
endTime: if endTime.isSome: endTime.get() else: Timestamp(0),
pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo())
let req = node.query(historyQuery)
let queryFut = node.query(historyQuery, peerOpt.get())
if not (await req.withTimeout(futTimeout)):
# Future failed to complete
if not await queryFut.withTimeout(futTimeout):
raise newException(ValueError, "No history response received (timeout)")
let res = req.read()
let res = queryFut.read()
if res.isErr():
raise newException(ValueError, $res.error())
raise newException(ValueError, $res.error)
debug "get_waku_v2_store_v1_messages response"
return res.value.toStoreResponse()

View File

@ -18,6 +18,7 @@ import
import
../protocol/[waku_relay, waku_message],
../protocol/waku_store,
../protocol/waku_store/client,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter,
../protocol/waku_lightpush,
@ -39,6 +40,7 @@ declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_store_peers, "number of store peers"
logScope:
@ -75,6 +77,7 @@ type
switch*: Switch
wakuRelay*: WakuRelay
wakuStore*: WakuStore
wakuStoreClient*: WakuStoreClient
wakuFilter*: WakuFilter
wakuSwap*: WakuSwap
wakuRlnRelay*: WakuRLNRelay
@ -499,6 +502,11 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as
## Waku store
proc mountStoreClient*(node: WakuNode, store: MessageStore = nil) =
info "mounting store client"
node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng, store)
const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes
proc executeMessageRetentionPolicy*(node: WakuNode) =
@ -534,7 +542,7 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none
else:
info "mounting waku store protocol with waku swap support"
node.wakuStore = WakuStore.init(
node.wakuStore = WakuStore.new(
node.peerManager,
node.rng,
store,
@ -548,29 +556,53 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuStore.isNil():
error "could not set peer, waku store is nil"
proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages
if node.wakuStoreClient.isNil():
return err("waku store client is nil")
let queryRes = await node.wakuStoreClient.query(query, peer)
if queryRes.isErr():
return err(queryRes.error)
let response = queryRes.get()
if not node.wakuSwap.isNil():
# Perform accounting operation
node.wakuSwap.debit(peer.peerId, response.messages.len)
return ok(response)
# TODO: Move to application module (e.g., wakunode2.nim)
proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError],
deprecated: "Use 'node.query()' with peer destination instead".} =
if node.wakuStoreClient.isNil():
error "could not set peer, waku store client is nil"
return
info "Set store peer", peer=peer
info "set store peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuStore.setPeer(remotePeer)
node.peerManager.addPeer(remotePeer, WakuStoreCodec)
waku_store_peers.inc()
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
# TODO: Move to application module (e.g., wakunode2.nim)
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
deprecated: "Use 'node.query()' with peer destination instead".} =
## Queries known nodes for historical messages
if node.wakuStoreClient.isNil():
return err("waku store client is nil")
# TODO: Once waku swap is less experimental, this can simplified
if node.wakuSwap.isNil():
debug "Using default query"
return await node.wakuStore.query(query)
else:
debug "Using SWAP accounting query"
# TODO: wakuSwap now part of wakuStore object
return await node.wakuStore.queryWithAccounting(query)
let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
return err("peer_not_found_failure")
return await node.query(query, peerOpt.get())
# TODO: Move to application module (e.g., wakunode2.nim)
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
@ -580,10 +612,10 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node.wakuStore.isNil():
if node.wakuStoreClient.isNil():
return
let retrievedMessages = await node.wakuStore.resume(peerList)
let retrievedMessages = await node.wakuStoreClient.resume(peerList)
if retrievedMessages.isErr():
error "failed to resume store", error=retrievedMessages.error
return

View File

@ -34,9 +34,8 @@ type WakuStoreClient* = ref object
proc new*(T: type WakuStoreClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore,
wakuSwap: WakuSwap = nil): T =
WakuStoreClient(peerManager: peerManager, rng: rng, store: store, wakuSwap: wakuSwap)
store: MessageStore): T =
WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
@ -113,30 +112,6 @@ proc queryLoop*(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo
return ok(messagesList)
### Set store peer and query for messages
proc setPeer*(ws: WakuStoreClient, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
proc query*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await w.query(req, peerOpt.get())
## Resume store
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
@ -216,28 +191,3 @@ proc resume*(w: WakuStoreClient,
added.inc()
return ok(added)
## EXPERIMENTAL
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(w: WakuStoreClient, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
if w.wakuSwap.isNil():
return err("waku swap not fount (nil)")
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
let queryRes = await w.query(req, peerOpt.get())
if queryRes.isErr():
return err(queryRes.error)
let response = queryRes.get()
# Perform accounting operation. Assumes wakuSwap protocol is mounted
w.wakuSwap.debit(peerOpt.get().peerId, response.messages.len)
return ok(response)

View File

@ -19,7 +19,6 @@ import
../../node/storage/message/waku_store_queue,
../../node/peer_manager/peer_manager,
../../utils/time,
../../utils/requests,
../waku_message,
../waku_swap/waku_swap,
./rpc,
@ -201,7 +200,7 @@ proc initProtocolHandler*(ws: WakuStore) =
ws.handler = handler
ws.codec = WakuStoreCodec
proc init*(T: type WakuStore,
proc new*(T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore,
@ -270,217 +269,3 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_store_insert_duration_seconds.observe(insertDuration)
## CLIENT
# TODO: This should probably be an add function and append the peer to an array
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) {.
deprecated: "use waku_store/client methods instead".} =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
deprecated: "use waku_store/client methods instead".} =
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
if connOpt.isNone():
waku_store_errors.inc(labelValues = [dialFailure])
return err(dialFailure)
let connection = connOpt.get()
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
await connection.writeLP(rpc.encode().buffer)
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
if response.isErr():
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return err(decodeRpcFailure)
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
return ok(response.value.response)
proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
deprecated: "use waku_store/client methods instead".} =
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await w.query(req, peerOpt.get())
## 21/WAKU2-FAULT-TOLERANT-STORE
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe,
deprecated: "use waku_store/client methods instead".} =
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages.
## Returns all the fetched messages, if error occurs, returns an error string
# Make a copy of the query
var req = query
var messageList: seq[WakuMessage] = @[]
# Fetch the history in pages
while true:
let res = await w.query(req, peer)
if res.isErr():
return err(res.error)
let response = res.get()
messageList.add(response.messages)
# Check whether it is the last page
if response.pagingInfo.pageSize == 0:
break
# Update paging cursor
req.pagingInfo.cursor = response.pagingInfo.cursor
return ok(messageList)
proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe,
deprecated: "use waku_store/client methods instead".} =
## Loops through the peers candidate list in order and sends the query to each
##
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq.
let queriesList = candidateList.mapIt(w.queryFromWithPaging(req, it))
await allFutures(queriesList)
let messagesList = queriesList
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
try:
# fut.read() can raise a CatchableError
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
if not fut.completed() or fut.read().isErr():
return @[]
fut.read().value
except CatchableError:
return @[]
)
.concat()
.deduplicate()
if messagesList.len == 0:
return err("failed to resolve the query")
return ok(messagesList)
proc resume*(w: WakuStore,
peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]),
pageSize: uint64 = DefaultPageSize,
pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe,
deprecated: "use waku_store/client methods instead".} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from.
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
## Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
# If store has not been provided, don't even try
if w.store.isNil():
return err("store not provided (nil)")
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
# of writing, the sqlite store implementation returns the last message's receiver
# timestamp.
# lastSeenTime = lastSeenItem.get().msg.timestamp
let
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
now = getNanosecondTime(getTime().toUnixFloat())
debug "resuming with offline time window", lastSeenTime=lastSeenTime, currentTime=now
let
queryEndTime = now + StoreResumeTimeWindowOffset
queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0)
let req = HistoryQuery(
pubsubTopic: pubsubTopic,
startTime: queryStartTime,
endTime: queryEndTime,
pagingInfo: PagingInfo(
direction:PagingDirection.FORWARD,
pageSize: pageSize
)
)
var res: WakuStoreResult[seq[WakuMessage]]
if peerList.isSome():
debug "trying the candidate list to fetch the history"
res = await w.queryLoop(req, peerList.get())
else:
debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
warn "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err("no suitable remote peers")
debug "a peer is selected from peer manager"
res = await w.queryFromWithPaging(req, peerOpt.get())
if res.isErr():
debug "failed to resume the history"
return err("failed to resume the history")
# Save the retrieved messages in the store
var added: uint = 0
for msg in res.get():
let putStoreRes = w.store.put(pubsubTopic, msg)
if putStoreRes.isErr():
warn "failed to insert resumed message into store", error=putStoreRes.error
continue
added.inc()
return ok(added)
## EXPERIMENTAL
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
deprecated: "use waku_store/client methods instead".} =
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
let res = await ws.query(req, peerOpt.get())
if res.isErr():
return err(res.error)
let response = res.get()
# Perform accounting operation. Assumes wakuSwap protocol is mounted
ws.wakuSwap.debit(peerOpt.get().peerId, response.messages.len)
return ok(response)

View File

@ -4,7 +4,6 @@ import metrics
declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
declarePublicGauge waku_store_peers, "number of store peers"
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
declarePublicGauge waku_store_queries, "number of store queries received"
declarePublicHistogram waku_store_insert_duration_seconds, "message insertion duration"