chore(store): remove waku swap integration from store protocol

This commit is contained in:
Lorenzo Delgado 2022-11-21 09:36:41 +01:00 committed by GitHub
parent 0c4fcdcc9d
commit e85b5cbae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 114 additions and 243 deletions

View File

@ -2,10 +2,10 @@
import
std/tables,
stew/shims/net as stewNet,
stew/shims/net as stewNet,
testutils/unittests,
chronos,
chronicles,
chronos,
chronicles,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/bufferstream,
@ -15,11 +15,9 @@ import
eth/keys
import
../../waku/v2/node/waku_node,
../../waku/v2/node/message_store/queue_store,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/utils/peers,
../test_helpers,
../test_helpers,
./utils,
./testlib/common
@ -50,106 +48,3 @@ procSuite "Waku SWAP Accounting":
check:
decodedCheque.isErr == false
decodedCheque.get() == cheque
# TODO: To do this reliably we need access to contract node
# With current logic state isn't updated because of bad cheque
# Consider moving this test to e2e test, and/or move swap module to be on by default
asyncTest "Update accounting state after store operations":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60102))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60100))
await allFutures(client.start(), server.start())
await server.mountSwap()
await server.mountStore(store=StoreQueueRef.new())
await client.mountSwap()
await client.mountStore()
client.mountStoreClient()
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
## When
let queryRes = await client.query(req, peer=serverPeer)
## Then
check queryRes.isOk()
let response = queryRes.get()
check:
response.messages == @[message]
check:
client.wakuSwap.accounting[server.peerInfo.peerId] == 1
server.wakuSwap.accounting[client.peerInfo.peerId] == -1
## Cleanup
await allFutures(client.stop(), server.stop())
# This test will only Be checked if in Mock mode
# TODO: Add cheque here
asyncTest "Update accounting state after sending cheque":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60202))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60200))
# Define the waku swap Config for this test
let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1)
# Start nodes and mount protocols
await allFutures(client.start(), server.start())
await server.mountSwap(swapConfig)
await server.mountStore(store=StoreQueueRef.new())
await client.mountSwap(swapConfig)
await client.mountStore()
client.mountStoreClient()
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
## When
# TODO: Handshakes - for now we assume implicit, e2e still works for PoC
let res1 = await client.query(req, peer=serverPeer)
let res2 = await client.query(req, peer=serverPeer)
require:
res1.isOk()
res2.isOk()
## Then
check:
# Accounting table updated with credit and debit, respectively
# After sending a cheque the balance is partially adjusted
client.wakuSwap.accounting[server.peerInfo.peerId] == 1
server.wakuSwap.accounting[client.peerInfo.peerId] == -1
## Cleanup
await allFutures(client.stop(), server.stop())

View File

@ -32,7 +32,7 @@ import
../protocol/waku_lightpush,
../protocol/waku_lightpush/client as lightpush_client,
../protocol/waku_peer_exchange,
../utils/peers,
../utils/peers,
../utils/wakuenr,
./peer_manager/peer_manager,
./message_store/message_retention_policy,
@ -128,11 +128,11 @@ template wsFlag(wssEnabled: bool): MultiAddress =
if wssEnabled: MultiAddress.init("/wss").tryGet()
else: MultiAddress.init("/ws").tryGet()
proc new*(T: type WakuNode,
proc new*(T: type WakuNode,
nodeKey: crypto.PrivateKey,
bindIp: ValidIpAddress,
bindIp: ValidIpAddress,
bindPort: Port,
extIp = none(ValidIpAddress),
extIp = none(ValidIpAddress),
extPort = none(Port),
peerStorage: PeerStorage = nil,
maxConnections = builders.MaxConnections,
@ -160,7 +160,7 @@ proc new*(T: type WakuNode,
# Setup external addresses, if available
var
hostExtAddress, wsExtAddress = none(MultiAddress)
if (dns4DomainName.isSome()):
# Use dns4 for externally announced addresses
hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get()))
@ -180,12 +180,12 @@ proc new*(T: type WakuNode,
announcedAddresses.add(hostExtAddress.get())
else:
announcedAddresses.add(hostAddress) # We always have at least a bind address for the host
if wsExtAddress.isSome():
announcedAddresses.add(wsExtAddress.get())
elif wsHostAddress.isSome():
announcedAddresses.add(wsHostAddress.get())
## Initialize peer
let
rng = crypto.newRng()
@ -202,7 +202,7 @@ proc new*(T: type WakuNode,
discv5UdpPort,
wakuFlags,
enrMultiaddrs)
info "Initializing networking", addrs=announcedAddresses
let switch = newWakuSwitch(
@ -210,7 +210,7 @@ proc new*(T: type WakuNode,
hostAddress,
wsHostAddress,
transportFlags = {ServerFlags.ReuseAddr},
rng = rng,
rng = rng,
maxConnections = maxConnections,
wssEnabled = wssEnabled,
secureKeyPath = secureKey,
@ -219,7 +219,7 @@ proc new*(T: type WakuNode,
sendSignedPeerRecord = sendSignedPeerRecord,
agentString = agentString
)
let wakuNode = WakuNode(
peerManager: PeerManager.new(switch, peerStorage),
switch: switch,
@ -231,7 +231,7 @@ proc new*(T: type WakuNode,
return wakuNode
proc peerInfo*(node: WakuNode): PeerInfo =
proc peerInfo*(node: WakuNode): PeerInfo =
node.switch.peerInfo
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
@ -239,7 +239,7 @@ proc info*(node: WakuNode): WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
let peerInfo = node.switch.peerInfo
var listenStr : seq[string]
for address in node.announcedAddresses:
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
@ -277,7 +277,7 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]
# Notify mounted protocols of new message
if not node.wakuFilter.isNil():
await node.wakuFilter.handleMessage(topic, msg.value)
if not node.wakuStore.isNil():
node.wakuStore.handleMessage(topic, msg.value)
@ -308,7 +308,7 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
# TODO: improved error handling
return
info "unsubscribe", topic=topic
let wakuRelay = node.wakuRelay
@ -316,22 +316,22 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) =
## Unsubscribes all handlers registered on a specific PubSub topic.
if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
# TODO: improved error handling
return
info "unsubscribeAll", topic=topic
let wakuRelay = node.wakuRelay
wakuRelay.unsubscribeAll(topic)
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
if node.wakuRelay.isNil():
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
# TODO: Improve error handling
@ -351,7 +351,7 @@ proc startRelay*(node: WakuNode) {.async.} =
return
## Setup relay protocol
# Subscribe to the default PubSub topics
for topic in node.wakuRelay.defaultPubsubTopics:
node.subscribe(topic, none(TopicHandler))
@ -359,14 +359,14 @@ proc startRelay*(node: WakuNode) {.async.} =
# Resume previous relay connections
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec,
protocolMatcher(WakuRelayCodec),
backoffPeriod)
# Start the WakuRelay protocol
await node.wakuRelay.start()
@ -378,7 +378,7 @@ proc mountRelay*(node: WakuNode,
peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} =
## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
info "mounting relay protocol"
let initRes = WakuRelay.new(
node.peerManager,
defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics),
@ -432,14 +432,14 @@ proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec))
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
@ -466,11 +466,11 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
if unsubRes.isOk():
info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
@ -501,12 +501,12 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
return
await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get())
# TODO: Move to application module (e.g., wakunode2.nim)
@ -516,7 +516,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
if node.wakuFilterClient.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers"
@ -565,23 +565,19 @@ proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration
# https://github.com/nim-lang/Nim/issues/17369
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
executeMessageRetentionPolicy(node)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
if node.wakuSwap.isNil():
info "mounting waku store protocol (no waku swap)"
else:
info "mounting waku store protocol with waku swap support"
info "mounting waku store protocol"
node.wakuStore = WakuStore.new(
node.peerManager,
node.rng,
store,
wakuSwap=node.wakuSwap,
node.peerManager,
node.rng,
store,
retentionPolicy=retentionPolicy
)
@ -605,12 +601,8 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W
let queryRes = await node.wakuStoreClient.query(query, peer)
if queryRes.isErr():
return err($queryRes.error)
let response = queryRes.get()
if not node.wakuSwap.isNil():
# Perform accounting operation
node.wakuSwap.debit(peer.peerId, response.messages.len)
return ok(response)
@ -645,13 +637,13 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
# TODO: Move to application module (e.g., wakunode2.nim)
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node.wakuStoreClient.isNil():
return
@ -660,7 +652,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
if retrievedMessages.isErr():
error "failed to resume store", error=retrievedMessages.error
return
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
@ -672,10 +664,10 @@ proc mountLightPush*(node: WakuNode) {.async.} =
var pushHandler: PushMessageHandler
if node.wakuRelay.isNil():
debug "mounting lightpush without relay (nil)"
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
else:
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)
return ok()
@ -723,7 +715,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
if node.wakuLightpushClient.isNil():
error "failed to publish message", error="waku lightpush client is nil"
return
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
error "failed to publish message", error="no suitable remote peers"
@ -777,11 +769,11 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
# @TODO: remove exception handling once explicit `raises` in ping module
raise newException(LPError, "Failed to initialize ping protocol")
if node.started:
# Node has started already. Let's start ping too.
await node.libp2pPing.start()
node.switch.mount(node.libp2pPing)
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
@ -805,7 +797,7 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
return
discard await node.libp2pPing.ping(connOpt.get()) # Ping connection
await sleepAsync(keepalive)
proc startKeepalive*(node: WakuNode) =
@ -832,7 +824,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
if discoveredPeers.isOk():
## Let's attempt to connect to peers we
## have not encountered before
trace "Discovered peers", count=discoveredPeers.get().len()
let newPeers = discoveredPeers.get().filterIt(
@ -850,9 +842,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Start Discovery v5 service
info "Starting discovery v5 service"
if not node.wakuDiscv5.isNil():
## First start listening on configured port
try:
@ -861,12 +853,12 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
except CatchableError:
error "Failed to start discovery service. UDP port may be already in use"
return false
## Start Discovery v5
trace "Start discv5 service"
node.wakuDiscv5.start()
trace "Start discovering new peers using discv5"
asyncSpawn node.runDiscv5Loop()
debug "Successfully started discovery v5 service"
@ -877,10 +869,10 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Stop Discovery v5 service
if not node.wakuDiscv5.isNil():
info "Stopping discovery v5 service"
## Stop Discovery v5 process and close listening port
if node.wakuDiscv5.listening:
trace "Stop listening on discv5 port"
@ -892,17 +884,17 @@ proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
waku_version.set(1, labelValues=[git_version])
info "Starting Waku node", version=git_version
let peerInfo = node.switch.peerInfo
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
var listenStr = ""
for address in node.announcedAddresses:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr
info "DNS: discoverable ENR ", enr = node.enr.toUri()
@ -910,7 +902,7 @@ proc start*(node: WakuNode) {.async.} =
# Perform relay-specific startup tasks TODO: this should be rethought
if not node.wakuRelay.isNil():
await node.startRelay()
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper =
@ -922,16 +914,16 @@ proc start*(node: WakuNode) {.async.} =
await node.switch.start()
node.started = true
info "Node started successfully"
proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil():
await node.wakuRelay.stop()
if not node.wakuDiscv5.isNil():
discard await node.stopDiscv5()
await node.switch.stop()
node.started = false
node.started = false

View File

@ -15,7 +15,6 @@ import
../../utils/requests,
../../utils/time,
../waku_message,
../waku_swap/waku_swap,
./protocol_metrics,
./common,
./rpc,
@ -27,7 +26,7 @@ logScope:
topics = "waku store client"
const
const
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
@ -35,12 +34,11 @@ type WakuStoreClient* = ref object
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
store: MessageStore
wakuSwap: WakuSwap
proc new*(T: type WakuStoreClient,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore): T =
store: MessageStore): T =
WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} =
@ -49,7 +47,7 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
if connOpt.isNone():
waku_store_errors.inc(labelValues = [dialFailure])
return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer))
let connection = connOpt.get()
@ -82,7 +80,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future
return await w.sendHistoryQueryRPC(req, peer)
proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages.
## Returns all the fetched messages, if error occurs, returns an error string
@ -93,7 +91,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F
while true:
let queryRes = await w.query(req, peer)
if queryRes.isErr():
if queryRes.isErr():
return err($queryRes.error)
let response = queryRes.get()
@ -114,7 +112,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## Loops through the peers candidate list in order and sends the query to each
##
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
@ -127,8 +125,8 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
try:
# fut.read() can raise a CatchableError
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
if not fut.completed() or fut.read().isErr():
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
if not fut.completed() or fut.read().isErr():
return @[]
fut.read().value
@ -140,30 +138,30 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
return ok(messagesList)
proc resume*(w: WakuStoreClient,
peerList = none(seq[RemotePeerInfo]),
proc resume*(w: WakuStoreClient,
peerList = none(seq[RemotePeerInfo]),
pageSize = DefaultPageSize,
pubsubTopic = DefaultPubsubTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from.
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
## Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
# If store has not been provided, don't even try
if w.store.isNil():
return err("store not provided (nil)")
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
# of writing, the sqlite store implementation returns the last message's receiver
# of writing, the sqlite store implementation returns the last message's receiver
# timestamp.
# lastSeenTime = lastSeenItem.get().msg.timestamp
let
let
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
now = getNanosecondTime(getTime().toUnixFloat())
@ -175,7 +173,7 @@ proc resume*(w: WakuStoreClient,
let req = HistoryQuery(
pubsubTopic: some(pubsubTopic),
startTime: some(queryStartTime),
startTime: some(queryStartTime),
endTime: some(queryEndTime),
pageSize: uint64(pageSize),
ascending: true
@ -188,7 +186,7 @@ proc resume*(w: WakuStoreClient,
else:
debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
warn "no suitable remote peers"
@ -198,7 +196,7 @@ proc resume*(w: WakuStoreClient,
debug "a peer is selected from peer manager"
res = await w.queryAll(req, peerOpt.get())
if res.isErr():
if res.isErr():
debug "failed to resume the history"
return err("failed to resume the history")

View File

@ -10,7 +10,7 @@ import
std/[tables, times, sequtils, options, algorithm],
stew/results,
chronicles,
chronos,
chronos,
bearssl/rand,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
@ -22,7 +22,6 @@ import
../../node/peer_manager/peer_manager,
../../utils/time,
../waku_message,
../waku_swap/waku_swap,
./common,
./rpc,
./rpc_codec,
@ -34,7 +33,7 @@ logScope:
topics = "waku store"
const
const
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
@ -43,7 +42,6 @@ type
peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext
store*: MessageStore
wakuSwap*: WakuSwap
retentionPolicy: Option[MessageRetentionPolicy]
@ -63,7 +61,7 @@ proc executeMessageRetentionPolicy*(w: WakuStore) =
debug "failed execution of retention policy", error=retPolicyRes.error
# TODO: Move to a message store wrapper
proc reportStoredMessagesMetric*(w: WakuStore) =
proc reportStoredMessagesMetric*(w: WakuStore) =
if w.store.isNil():
return
@ -78,7 +76,7 @@ proc isValidMessage(msg: WakuMessage): bool =
if msg.timestamp == 0:
return true
let
let
now = getNanosecondTime(getTime().toUnixFloat())
lowerBound = now - MaxMessageTimestampVariance
upperBound = now + MaxMessageTimestampVariance
@ -94,19 +92,19 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
# The message is ephemeral, should not be stored
return
if not isValidMessage(msg):
waku_store_errors.inc(labelValues = [invalidMessage])
return
let insertStartTime = getTime().toUnixFloat()
block:
let
msgDigest = computeDigest(msg)
msgDigest = computeDigest(msg)
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
else: getNanosecondTime(getTime().toUnixFloat())
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
@ -123,14 +121,14 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
# TODO: Move to a message store wrapper
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
## Query history to return a single page of messages matching the query
# Extract query criteria. All query criteria are optional
let
qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic])
else: some(query.contentTopics)
qPubSubTopic = query.pubsubTopic
qCursor = query.cursor
qStartTime = query.startTime
qStartTime = query.startTime
qEndTime = query.endTime
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
else: min(query.pageSize, MaxPageSize)
@ -138,7 +136,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
let queryStartTime = getTime().toUnixFloat()
let queryRes = w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
@ -159,15 +157,15 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
return err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
let rows = queryRes.get()
if rows.len <= 0:
return ok(HistoryResponse(
messages: @[],
messages: @[],
pageSize: 0,
ascending: qAscendingOrder,
cursor: none(HistoryCursor)
))
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
else: rows[0..^2].mapIt(it[1])
@ -177,7 +175,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
if not qAscendingOrder:
messages.reverse()
if rows.len > int(qMaxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
@ -190,7 +188,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
messageDigest[i] = digest[i]
cursor = some(HistoryCursor(
pubsubTopic: pubsubTopic,
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: storeTimestamp,
digest: MessageDigest(data: messageDigest)
@ -198,7 +196,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
ok(HistoryResponse(
messages: messages,
messages: messages,
pageSize: uint64(messages.len),
ascending: qAscendingOrder,
cursor: cursor
@ -207,8 +205,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
## Protocol
proc initProtocolHandler*(ws: WakuStore) =
proc initProtocolHandler(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int)
@ -233,9 +231,9 @@ proc initProtocolHandler*(ws: WakuStore) =
waku_store_queries.inc()
if ws.store.isNil():
if ws.store.isNil():
let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE)
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr
let resp = HistoryResponseRPC(error: respErr.toRPC())
@ -245,7 +243,7 @@ proc initProtocolHandler*(ws: WakuStore) =
let query = reqRpc.query.get().toAPI()
let respRes = ws.findMessages(query)
if respRes.isErr():
@ -259,16 +257,6 @@ proc initProtocolHandler*(ws: WakuStore) =
let resp = respRes.toRPC()
if not ws.wakuSwap.isNil():
info "handle store swap", peerId=conn.peerId, requestId=reqRpc.requestId, text=ws.wakuSwap.text
# Perform accounting operation
# TODO: Do accounting here, response is HistoryResponseRPC. How do we get node or swap context?
let peerId = conn.peerId
let messages = resp.messages
ws.wakuSwap.credit(peerId, messages.len)
info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
@ -277,17 +265,15 @@ proc initProtocolHandler*(ws: WakuStore) =
ws.handler = handler
ws.codec = WakuStoreCodec
proc new*(T: type WakuStore,
peerManager: PeerManager,
proc new*(T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
store: MessageStore,
wakuSwap: WakuSwap = nil,
store: MessageStore,
retentionPolicy=none(MessageRetentionPolicy)): T =
let ws = WakuStore(
rng: rng,
peerManager: peerManager,
store: store,
wakuSwap: wakuSwap,
rng: rng,
peerManager: peerManager,
store: store,
retentionPolicy: retentionPolicy
)
ws.initProtocolHandler()