chore(store): remove waku swap integration from store protocol

This commit is contained in:
Lorenzo Delgado 2022-11-21 09:36:41 +01:00 committed by GitHub
parent b5e5b8f90b
commit aba02d10d3
4 changed files with 114 additions and 243 deletions

View File

@ -2,10 +2,10 @@
import import
std/tables, std/tables,
stew/shims/net as stewNet, stew/shims/net as stewNet,
testutils/unittests, testutils/unittests,
chronos, chronos,
chronicles, chronicles,
libp2p/switch, libp2p/switch,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/bufferstream, libp2p/stream/bufferstream,
@ -15,11 +15,9 @@ import
eth/keys eth/keys
import import
../../waku/v2/node/waku_node, ../../waku/v2/node/waku_node,
../../waku/v2/node/message_store/queue_store,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../test_helpers, ../test_helpers,
./utils, ./utils,
./testlib/common ./testlib/common
@ -50,106 +48,3 @@ procSuite "Waku SWAP Accounting":
check: check:
decodedCheque.isErr == false decodedCheque.isErr == false
decodedCheque.get() == cheque decodedCheque.get() == cheque
# TODO: To do this reliably we need access to contract node
# With current logic state isn't updated because of bad cheque
# Consider moving this test to e2e test, and/or move swap module to be on by default
asyncTest "Update accounting state after store operations":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60102))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60100))
await allFutures(client.start(), server.start())
await server.mountSwap()
await server.mountStore(store=StoreQueueRef.new())
await client.mountSwap()
await client.mountStore()
client.mountStoreClient()
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
## When
let queryRes = await client.query(req, peer=serverPeer)
## Then
check queryRes.isOk()
let response = queryRes.get()
check:
response.messages == @[message]
check:
client.wakuSwap.accounting[server.peerInfo.peerId] == 1
server.wakuSwap.accounting[client.peerInfo.peerId] == -1
## Cleanup
await allFutures(client.stop(), server.stop())
# This test will only Be checked if in Mock mode
# TODO: Add cheque here
asyncTest "Update accounting state after sending cheque":
## Setup
let
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60202))
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60200))
# Define the waku swap Config for this test
let swapConfig = SwapConfig(mode: SwapMode.Mock, paymentThreshold: 1, disconnectThreshold: -1)
# Start nodes and mount protocols
await allFutures(client.start(), server.start())
await server.mountSwap(swapConfig)
await server.mountStore(store=StoreQueueRef.new())
await client.mountSwap(swapConfig)
await client.mountStore()
client.mountStoreClient()
client.wakuSwap.setPeer(server.peerInfo.toRemotePeerInfo())
server.wakuSwap.setPeer(client.peerInfo.toRemotePeerInfo())
client.setStorePeer(server.peerInfo.toRemotePeerInfo())
server.setStorePeer(client.peerInfo.toRemotePeerInfo())
## Given
let message = fakeWakuMessage()
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
let serverPeer = server.peerInfo.toRemotePeerInfo()
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
## When
# TODO: Handshakes - for now we assume implicit, e2e still works for PoC
let res1 = await client.query(req, peer=serverPeer)
let res2 = await client.query(req, peer=serverPeer)
require:
res1.isOk()
res2.isOk()
## Then
check:
# Accounting table updated with credit and debit, respectively
# After sending a cheque the balance is partially adjusted
client.wakuSwap.accounting[server.peerInfo.peerId] == 1
server.wakuSwap.accounting[client.peerInfo.peerId] == -1
## Cleanup
await allFutures(client.stop(), server.stop())

View File

@ -32,7 +32,7 @@ import
../protocol/waku_lightpush, ../protocol/waku_lightpush,
../protocol/waku_lightpush/client as lightpush_client, ../protocol/waku_lightpush/client as lightpush_client,
../protocol/waku_peer_exchange, ../protocol/waku_peer_exchange,
../utils/peers, ../utils/peers,
../utils/wakuenr, ../utils/wakuenr,
./peer_manager/peer_manager, ./peer_manager/peer_manager,
./message_store/message_retention_policy, ./message_store/message_retention_policy,
@ -128,11 +128,11 @@ template wsFlag(wssEnabled: bool): MultiAddress =
if wssEnabled: MultiAddress.init("/wss").tryGet() if wssEnabled: MultiAddress.init("/wss").tryGet()
else: MultiAddress.init("/ws").tryGet() else: MultiAddress.init("/ws").tryGet()
proc new*(T: type WakuNode, proc new*(T: type WakuNode,
nodeKey: crypto.PrivateKey, nodeKey: crypto.PrivateKey,
bindIp: ValidIpAddress, bindIp: ValidIpAddress,
bindPort: Port, bindPort: Port,
extIp = none(ValidIpAddress), extIp = none(ValidIpAddress),
extPort = none(Port), extPort = none(Port),
peerStorage: PeerStorage = nil, peerStorage: PeerStorage = nil,
maxConnections = builders.MaxConnections, maxConnections = builders.MaxConnections,
@ -160,7 +160,7 @@ proc new*(T: type WakuNode,
# Setup external addresses, if available # Setup external addresses, if available
var var
hostExtAddress, wsExtAddress = none(MultiAddress) hostExtAddress, wsExtAddress = none(MultiAddress)
if (dns4DomainName.isSome()): if (dns4DomainName.isSome()):
# Use dns4 for externally announced addresses # Use dns4 for externally announced addresses
hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get())) hostExtAddress = some(dns4TcpEndPoint(dns4DomainName.get(), extPort.get()))
@ -180,12 +180,12 @@ proc new*(T: type WakuNode,
announcedAddresses.add(hostExtAddress.get()) announcedAddresses.add(hostExtAddress.get())
else: else:
announcedAddresses.add(hostAddress) # We always have at least a bind address for the host announcedAddresses.add(hostAddress) # We always have at least a bind address for the host
if wsExtAddress.isSome(): if wsExtAddress.isSome():
announcedAddresses.add(wsExtAddress.get()) announcedAddresses.add(wsExtAddress.get())
elif wsHostAddress.isSome(): elif wsHostAddress.isSome():
announcedAddresses.add(wsHostAddress.get()) announcedAddresses.add(wsHostAddress.get())
## Initialize peer ## Initialize peer
let let
rng = crypto.newRng() rng = crypto.newRng()
@ -202,7 +202,7 @@ proc new*(T: type WakuNode,
discv5UdpPort, discv5UdpPort,
wakuFlags, wakuFlags,
enrMultiaddrs) enrMultiaddrs)
info "Initializing networking", addrs=announcedAddresses info "Initializing networking", addrs=announcedAddresses
let switch = newWakuSwitch( let switch = newWakuSwitch(
@ -210,7 +210,7 @@ proc new*(T: type WakuNode,
hostAddress, hostAddress,
wsHostAddress, wsHostAddress,
transportFlags = {ServerFlags.ReuseAddr}, transportFlags = {ServerFlags.ReuseAddr},
rng = rng, rng = rng,
maxConnections = maxConnections, maxConnections = maxConnections,
wssEnabled = wssEnabled, wssEnabled = wssEnabled,
secureKeyPath = secureKey, secureKeyPath = secureKey,
@ -219,7 +219,7 @@ proc new*(T: type WakuNode,
sendSignedPeerRecord = sendSignedPeerRecord, sendSignedPeerRecord = sendSignedPeerRecord,
agentString = agentString agentString = agentString
) )
let wakuNode = WakuNode( let wakuNode = WakuNode(
peerManager: PeerManager.new(switch, peerStorage), peerManager: PeerManager.new(switch, peerStorage),
switch: switch, switch: switch,
@ -231,7 +231,7 @@ proc new*(T: type WakuNode,
return wakuNode return wakuNode
proc peerInfo*(node: WakuNode): PeerInfo = proc peerInfo*(node: WakuNode): PeerInfo =
node.switch.peerInfo node.switch.peerInfo
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc # TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
@ -239,7 +239,7 @@ proc info*(node: WakuNode): WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at. ## Returns information about the Node, such as what multiaddress it can be reached at.
let peerInfo = node.switch.peerInfo let peerInfo = node.switch.peerInfo
var listenStr : seq[string] var listenStr : seq[string]
for address in node.announcedAddresses: for address in node.announcedAddresses:
var fulladdr = $address & "/p2p/" & $peerInfo.peerId var fulladdr = $address & "/p2p/" & $peerInfo.peerId
@ -277,7 +277,7 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]
# Notify mounted protocols of new message # Notify mounted protocols of new message
if not node.wakuFilter.isNil(): if not node.wakuFilter.isNil():
await node.wakuFilter.handleMessage(topic, msg.value) await node.wakuFilter.handleMessage(topic, msg.value)
if not node.wakuStore.isNil(): if not node.wakuStore.isNil():
node.wakuStore.handleMessage(topic, msg.value) node.wakuStore.handleMessage(topic, msg.value)
@ -308,7 +308,7 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
error "Invalid API call to `unsubscribe`. WakuRelay not mounted." error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
# TODO: improved error handling # TODO: improved error handling
return return
info "unsubscribe", topic=topic info "unsubscribe", topic=topic
let wakuRelay = node.wakuRelay let wakuRelay = node.wakuRelay
@ -316,22 +316,22 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) = proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) =
## Unsubscribes all handlers registered on a specific PubSub topic. ## Unsubscribes all handlers registered on a specific PubSub topic.
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
# TODO: improved error handling # TODO: improved error handling
return return
info "unsubscribeAll", topic=topic info "unsubscribeAll", topic=topic
let wakuRelay = node.wakuRelay let wakuRelay = node.wakuRelay
wakuRelay.unsubscribeAll(topic) wakuRelay.unsubscribeAll(topic)
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also ## `contentTopic` field for light node functionality. This field may be also
## be omitted. ## be omitted.
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead."
# TODO: Improve error handling # TODO: Improve error handling
@ -351,7 +351,7 @@ proc startRelay*(node: WakuNode) {.async.} =
return return
## Setup relay protocol ## Setup relay protocol
# Subscribe to the default PubSub topics # Subscribe to the default PubSub topics
for topic in node.wakuRelay.defaultPubsubTopics: for topic in node.wakuRelay.defaultPubsubTopics:
node.subscribe(topic, none(TopicHandler)) node.subscribe(topic, none(TopicHandler))
@ -359,14 +359,14 @@ proc startRelay*(node: WakuNode) {.async.} =
# Resume previous relay connections # Resume previous relay connections
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)): if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting." info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary # Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec, await node.peerManager.reconnectPeers(WakuRelayCodec,
protocolMatcher(WakuRelayCodec), protocolMatcher(WakuRelayCodec),
backoffPeriod) backoffPeriod)
# Start the WakuRelay protocol # Start the WakuRelay protocol
await node.wakuRelay.start() await node.wakuRelay.start()
@ -378,7 +378,7 @@ proc mountRelay*(node: WakuNode,
peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} = peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} =
## The default relay topics is the union of all configured topics plus default PubsubTopic(s) ## The default relay topics is the union of all configured topics plus default PubsubTopic(s)
info "mounting relay protocol" info "mounting relay protocol"
let initRes = WakuRelay.new( let initRes = WakuRelay.new(
node.peerManager, node.peerManager,
defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics), defaultPubsubTopics = concat(@[DefaultPubsubTopic], topics),
@ -432,14 +432,14 @@ proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec)) node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec))
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClient.isNil(): if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil" error "cannot register filter subscription to topic", error="waku filter client is nil"
return return
let remotePeer = when peer is string: parseRemotePeerInfo(peer) let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer else: peer
info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
@ -466,11 +466,11 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
error "cannot unregister filter subscription to content", error="waku filter client is nil" error "cannot unregister filter subscription to content", error="waku filter client is nil"
return return
let remotePeer = when peer is string: parseRemotePeerInfo(peer) let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer else: peer
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer
let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
if unsubRes.isOk(): if unsubRes.isOk():
info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
@ -501,12 +501,12 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
if node.wakuFilterClient.isNil(): if node.wakuFilterClient.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil" error "cannot register filter subscription to topic", error="waku filter client is nil"
return return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone(): if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers" error "cannot register filter subscription to topic", error="no suitable remote peers"
return return
await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get())
# TODO: Move to application module (e.g., wakunode2.nim) # TODO: Move to application module (e.g., wakunode2.nim)
@ -516,7 +516,7 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
if node.wakuFilterClient.isNil(): if node.wakuFilterClient.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil" error "cannot unregister filter subscription to content", error="waku filter client is nil"
return return
let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) let peerOpt = node.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone(): if peerOpt.isNone():
error "cannot register filter subscription to topic", error="no suitable remote peers" error "cannot register filter subscription to topic", error="no suitable remote peers"
@ -565,23 +565,19 @@ proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration
# https://github.com/nim-lang/Nim/issues/17369 # https://github.com/nim-lang/Nim/issues/17369
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].} var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
executeMessageRetentionPolicy(node) executeMessageRetentionPolicy(node)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} = proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none(MessageRetentionPolicy) ) {.async, raises: [Defect, LPError].} =
if node.wakuSwap.isNil(): info "mounting waku store protocol"
info "mounting waku store protocol (no waku swap)"
else:
info "mounting waku store protocol with waku swap support"
node.wakuStore = WakuStore.new( node.wakuStore = WakuStore.new(
node.peerManager, node.peerManager,
node.rng, node.rng,
store, store,
wakuSwap=node.wakuSwap,
retentionPolicy=retentionPolicy retentionPolicy=retentionPolicy
) )
@ -605,12 +601,8 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W
let queryRes = await node.wakuStoreClient.query(query, peer) let queryRes = await node.wakuStoreClient.query(query, peer)
if queryRes.isErr(): if queryRes.isErr():
return err($queryRes.error) return err($queryRes.error)
let response = queryRes.get() let response = queryRes.get()
if not node.wakuSwap.isNil():
# Perform accounting operation
node.wakuSwap.debit(peer.peerId, response.messages.len)
return ok(response) return ok(response)
@ -645,13 +637,13 @@ proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[History
# TODO: Move to application module (e.g., wakunode2.nim) # TODO: Move to application module (e.g., wakunode2.nim)
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} = proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db ## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony ## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node.wakuStoreClient.isNil(): if node.wakuStoreClient.isNil():
return return
@ -660,7 +652,7 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re
if retrievedMessages.isErr(): if retrievedMessages.isErr():
error "failed to resume store", error=retrievedMessages.error error "failed to resume store", error=retrievedMessages.error
return return
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
@ -672,10 +664,10 @@ proc mountLightPush*(node: WakuNode) {.async.} =
var pushHandler: PushMessageHandler var pushHandler: PushMessageHandler
if node.wakuRelay.isNil(): if node.wakuRelay.isNil():
debug "mounting lightpush without relay (nil)" debug "mounting lightpush without relay (nil)"
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found") return err("no waku relay found")
else: else:
pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} = pushHandler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer) discard await node.wakuRelay.publish(pubsubTopic, message.encode().buffer)
return ok() return ok()
@ -723,7 +715,7 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe
if node.wakuLightpushClient.isNil(): if node.wakuLightpushClient.isNil():
error "failed to publish message", error="waku lightpush client is nil" error "failed to publish message", error="waku lightpush client is nil"
return return
let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec) let peerOpt = node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone(): if peerOpt.isNone():
error "failed to publish message", error="no suitable remote peers" error "failed to publish message", error="no suitable remote peers"
@ -777,11 +769,11 @@ proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
# This is necessary as `Ping.new*` does not have explicit `raises` requirement # This is necessary as `Ping.new*` does not have explicit `raises` requirement
# @TODO: remove exception handling once explicit `raises` in ping module # @TODO: remove exception handling once explicit `raises` in ping module
raise newException(LPError, "Failed to initialize ping protocol") raise newException(LPError, "Failed to initialize ping protocol")
if node.started: if node.started:
# Node has started already. Let's start ping too. # Node has started already. Let's start ping too.
await node.libp2pPing.start() await node.libp2pPing.start()
node.switch.mount(node.libp2pPing) node.switch.mount(node.libp2pPing)
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
@ -805,7 +797,7 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
return return
discard await node.libp2pPing.ping(connOpt.get()) # Ping connection discard await node.libp2pPing.ping(connOpt.get()) # Ping connection
await sleepAsync(keepalive) await sleepAsync(keepalive)
proc startKeepalive*(node: WakuNode) = proc startKeepalive*(node: WakuNode) =
@ -832,7 +824,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
if discoveredPeers.isOk(): if discoveredPeers.isOk():
## Let's attempt to connect to peers we ## Let's attempt to connect to peers we
## have not encountered before ## have not encountered before
trace "Discovered peers", count=discoveredPeers.get().len() trace "Discovered peers", count=discoveredPeers.get().len()
let newPeers = discoveredPeers.get().filterIt( let newPeers = discoveredPeers.get().filterIt(
@ -850,9 +842,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Start Discovery v5 service ## Start Discovery v5 service
info "Starting discovery v5 service" info "Starting discovery v5 service"
if not node.wakuDiscv5.isNil(): if not node.wakuDiscv5.isNil():
## First start listening on configured port ## First start listening on configured port
try: try:
@ -861,12 +853,12 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
except CatchableError: except CatchableError:
error "Failed to start discovery service. UDP port may be already in use" error "Failed to start discovery service. UDP port may be already in use"
return false return false
## Start Discovery v5 ## Start Discovery v5
trace "Start discv5 service" trace "Start discv5 service"
node.wakuDiscv5.start() node.wakuDiscv5.start()
trace "Start discovering new peers using discv5" trace "Start discovering new peers using discv5"
asyncSpawn node.runDiscv5Loop() asyncSpawn node.runDiscv5Loop()
debug "Successfully started discovery v5 service" debug "Successfully started discovery v5 service"
@ -877,10 +869,10 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Stop Discovery v5 service ## Stop Discovery v5 service
if not node.wakuDiscv5.isNil(): if not node.wakuDiscv5.isNil():
info "Stopping discovery v5 service" info "Stopping discovery v5 service"
## Stop Discovery v5 process and close listening port ## Stop Discovery v5 process and close listening port
if node.wakuDiscv5.listening: if node.wakuDiscv5.listening:
trace "Stop listening on discv5 port" trace "Stop listening on discv5 port"
@ -892,17 +884,17 @@ proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
proc start*(node: WakuNode) {.async.} = proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and ## Starts a created Waku Node and
## all its mounted protocols. ## all its mounted protocols.
waku_version.set(1, labelValues=[git_version]) waku_version.set(1, labelValues=[git_version])
info "Starting Waku node", version=git_version info "Starting Waku node", version=git_version
let peerInfo = node.switch.peerInfo let peerInfo = node.switch.peerInfo
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
var listenStr = "" var listenStr = ""
for address in node.announcedAddresses: for address in node.announcedAddresses:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr listenStr &= fulladdr
## XXX: this should be /ip4..., / stripped? ## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr info "Listening on", full = listenStr
info "DNS: discoverable ENR ", enr = node.enr.toUri() info "DNS: discoverable ENR ", enr = node.enr.toUri()
@ -910,7 +902,7 @@ proc start*(node: WakuNode) {.async.} =
# Perform relay-specific startup tasks TODO: this should be rethought # Perform relay-specific startup tasks TODO: this should be rethought
if not node.wakuRelay.isNil(): if not node.wakuRelay.isNil():
await node.startRelay() await node.startRelay()
## The switch uses this mapper to update peer info addrs ## The switch uses this mapper to update peer info addrs
## with announced addrs after start ## with announced addrs after start
let addressMapper = let addressMapper =
@ -922,16 +914,16 @@ proc start*(node: WakuNode) {.async.} =
await node.switch.start() await node.switch.start()
node.started = true node.started = true
info "Node started successfully" info "Node started successfully"
proc stop*(node: WakuNode) {.async.} = proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil(): if not node.wakuRelay.isNil():
await node.wakuRelay.stop() await node.wakuRelay.stop()
if not node.wakuDiscv5.isNil(): if not node.wakuDiscv5.isNil():
discard await node.stopDiscv5() discard await node.stopDiscv5()
await node.switch.stop() await node.switch.stop()
node.started = false node.started = false

View File

@ -15,7 +15,6 @@ import
../../utils/requests, ../../utils/requests,
../../utils/time, ../../utils/time,
../waku_message, ../waku_message,
../waku_swap/waku_swap,
./protocol_metrics, ./protocol_metrics,
./common, ./common,
./rpc, ./rpc,
@ -27,7 +26,7 @@ logScope:
topics = "waku store client" topics = "waku store client"
const const
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
@ -35,12 +34,11 @@ type WakuStoreClient* = ref object
peerManager: PeerManager peerManager: PeerManager
rng: ref rand.HmacDrbgContext rng: ref rand.HmacDrbgContext
store: MessageStore store: MessageStore
wakuSwap: WakuSwap
proc new*(T: type WakuStoreClient, proc new*(T: type WakuStoreClient,
peerManager: PeerManager, peerManager: PeerManager,
rng: ref rand.HmacDrbgContext, rng: ref rand.HmacDrbgContext,
store: MessageStore): T = store: MessageStore): T =
WakuStoreClient(peerManager: peerManager, rng: rng, store: store) WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} = proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future[HistoryResult] {.async, gcsafe.} =
@ -49,7 +47,7 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
if connOpt.isNone(): if connOpt.isNone():
waku_store_errors.inc(labelValues = [dialFailure]) waku_store_errors.inc(labelValues = [dialFailure])
return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer)) return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer))
let connection = connOpt.get() let connection = connOpt.get()
@ -82,7 +80,7 @@ proc query*(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo): Future
return await w.sendHistoryQueryRPC(req, peer) return await w.sendHistoryQueryRPC(req, peer)
proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo, ## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages. ## it retrieves the historical messages in pages.
## Returns all the fetched messages, if error occurs, returns an error string ## Returns all the fetched messages, if error occurs, returns an error string
@ -93,7 +91,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F
while true: while true:
let queryRes = await w.query(req, peer) let queryRes = await w.query(req, peer)
if queryRes.isErr(): if queryRes.isErr():
return err($queryRes.error) return err($queryRes.error)
let response = queryRes.get() let response = queryRes.get()
@ -114,7 +112,7 @@ proc queryAll*(w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo): F
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust the time window with an offset of 20 seconds
proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## Loops through the peers candidate list in order and sends the query to each ## Loops through the peers candidate list in order and sends the query to each
## ##
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list. ## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
@ -127,8 +125,8 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] = .map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
try: try:
# fut.read() can raise a CatchableError # fut.read() can raise a CatchableError
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check. # These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
if not fut.completed() or fut.read().isErr(): if not fut.completed() or fut.read().isErr():
return @[] return @[]
fut.read().value fut.read().value
@ -140,30 +138,30 @@ proc queryLoop(w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
return ok(messagesList) return ok(messagesList)
proc resume*(w: WakuStoreClient, proc resume*(w: WakuStoreClient,
peerList = none(seq[RemotePeerInfo]), peerList = none(seq[RemotePeerInfo]),
pageSize = DefaultPageSize, pageSize = DefaultPageSize,
pubsubTopic = DefaultPubsubTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = pubsubTopic = DefaultPubsubTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db ## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony ## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. ## peerList indicates the list of peers to query from.
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list. ## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
## Such candidates should be found through a discovery method (to be developed). ## Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
# If store has not been provided, don't even try # If store has not been provided, don't even try
if w.store.isNil(): if w.store.isNil():
return err("store not provided (nil)") return err("store not provided (nil)")
# NOTE: Original implementation is based on the message's sender timestamp. At the moment # NOTE: Original implementation is based on the message's sender timestamp. At the moment
# of writing, the sqlite store implementation returns the last message's receiver # of writing, the sqlite store implementation returns the last message's receiver
# timestamp. # timestamp.
# lastSeenTime = lastSeenItem.get().msg.timestamp # lastSeenTime = lastSeenItem.get().msg.timestamp
let let
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0)) lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
now = getNanosecondTime(getTime().toUnixFloat()) now = getNanosecondTime(getTime().toUnixFloat())
@ -175,7 +173,7 @@ proc resume*(w: WakuStoreClient,
let req = HistoryQuery( let req = HistoryQuery(
pubsubTopic: some(pubsubTopic), pubsubTopic: some(pubsubTopic),
startTime: some(queryStartTime), startTime: some(queryStartTime),
endTime: some(queryEndTime), endTime: some(queryEndTime),
pageSize: uint64(pageSize), pageSize: uint64(pageSize),
ascending: true ascending: true
@ -188,7 +186,7 @@ proc resume*(w: WakuStoreClient,
else: else:
debug "no candidate list is provided, selecting a random peer" debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager # if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec) let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone(): if peerOpt.isNone():
warn "no suitable remote peers" warn "no suitable remote peers"
@ -198,7 +196,7 @@ proc resume*(w: WakuStoreClient,
debug "a peer is selected from peer manager" debug "a peer is selected from peer manager"
res = await w.queryAll(req, peerOpt.get()) res = await w.queryAll(req, peerOpt.get())
if res.isErr(): if res.isErr():
debug "failed to resume the history" debug "failed to resume the history"
return err("failed to resume the history") return err("failed to resume the history")

View File

@ -10,7 +10,7 @@ import
std/[tables, times, sequtils, options, algorithm], std/[tables, times, sequtils, options, algorithm],
stew/results, stew/results,
chronicles, chronicles,
chronos, chronos,
bearssl/rand, bearssl/rand,
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/protocol, libp2p/protocols/protocol,
@ -22,7 +22,6 @@ import
../../node/peer_manager/peer_manager, ../../node/peer_manager/peer_manager,
../../utils/time, ../../utils/time,
../waku_message, ../waku_message,
../waku_swap/waku_swap,
./common, ./common,
./rpc, ./rpc,
./rpc_codec, ./rpc_codec,
@ -34,7 +33,7 @@ logScope:
topics = "waku store" topics = "waku store"
const const
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
@ -43,7 +42,6 @@ type
peerManager*: PeerManager peerManager*: PeerManager
rng*: ref rand.HmacDrbgContext rng*: ref rand.HmacDrbgContext
store*: MessageStore store*: MessageStore
wakuSwap*: WakuSwap
retentionPolicy: Option[MessageRetentionPolicy] retentionPolicy: Option[MessageRetentionPolicy]
@ -63,7 +61,7 @@ proc executeMessageRetentionPolicy*(w: WakuStore) =
debug "failed execution of retention policy", error=retPolicyRes.error debug "failed execution of retention policy", error=retPolicyRes.error
# TODO: Move to a message store wrapper # TODO: Move to a message store wrapper
proc reportStoredMessagesMetric*(w: WakuStore) = proc reportStoredMessagesMetric*(w: WakuStore) =
if w.store.isNil(): if w.store.isNil():
return return
@ -78,7 +76,7 @@ proc isValidMessage(msg: WakuMessage): bool =
if msg.timestamp == 0: if msg.timestamp == 0:
return true return true
let let
now = getNanosecondTime(getTime().toUnixFloat()) now = getNanosecondTime(getTime().toUnixFloat())
lowerBound = now - MaxMessageTimestampVariance lowerBound = now - MaxMessageTimestampVariance
upperBound = now + MaxMessageTimestampVariance upperBound = now + MaxMessageTimestampVariance
@ -94,19 +92,19 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral: if msg.ephemeral:
# The message is ephemeral, should not be stored # The message is ephemeral, should not be stored
return return
if not isValidMessage(msg): if not isValidMessage(msg):
waku_store_errors.inc(labelValues = [invalidMessage]) waku_store_errors.inc(labelValues = [invalidMessage])
return return
let insertStartTime = getTime().toUnixFloat() let insertStartTime = getTime().toUnixFloat()
block: block:
let let
msgDigest = computeDigest(msg) msgDigest = computeDigest(msg)
msgReceivedTime = if msg.timestamp > 0: msg.timestamp msgReceivedTime = if msg.timestamp > 0: msg.timestamp
else: getNanosecondTime(getTime().toUnixFloat()) else: getNanosecondTime(getTime().toUnixFloat())
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
@ -123,14 +121,14 @@ proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
# TODO: Move to a message store wrapper # TODO: Move to a message store wrapper
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} = proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
## Query history to return a single page of messages matching the query ## Query history to return a single page of messages matching the query
# Extract query criteria. All query criteria are optional # Extract query criteria. All query criteria are optional
let let
qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic]) qContentTopics = if query.contentTopics.len == 0: none(seq[ContentTopic])
else: some(query.contentTopics) else: some(query.contentTopics)
qPubSubTopic = query.pubsubTopic qPubSubTopic = query.pubsubTopic
qCursor = query.cursor qCursor = query.cursor
qStartTime = query.startTime qStartTime = query.startTime
qEndTime = query.endTime qEndTime = query.endTime
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
else: min(query.pageSize, MaxPageSize) else: min(query.pageSize, MaxPageSize)
@ -138,7 +136,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
let queryStartTime = getTime().toUnixFloat() let queryStartTime = getTime().toUnixFloat()
let queryRes = w.store.getMessagesByHistoryQuery( let queryRes = w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics, contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic, pubsubTopic = qPubSubTopic,
@ -159,15 +157,15 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
return err(HistoryError(kind: HistoryErrorKind.UNKNOWN)) return err(HistoryError(kind: HistoryErrorKind.UNKNOWN))
let rows = queryRes.get() let rows = queryRes.get()
if rows.len <= 0: if rows.len <= 0:
return ok(HistoryResponse( return ok(HistoryResponse(
messages: @[], messages: @[],
pageSize: 0, pageSize: 0,
ascending: qAscendingOrder, ascending: qAscendingOrder,
cursor: none(HistoryCursor) cursor: none(HistoryCursor)
)) ))
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1]) var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
else: rows[0..^2].mapIt(it[1]) else: rows[0..^2].mapIt(it[1])
@ -177,7 +175,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
if not qAscendingOrder: if not qAscendingOrder:
messages.reverse() messages.reverse()
if rows.len > int(qMaxPageSize): if rows.len > int(qMaxPageSize):
## Build last message cursor ## Build last message cursor
## The cursor is built from the last message INCLUDED in the response ## The cursor is built from the last message INCLUDED in the response
@ -190,7 +188,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
messageDigest[i] = digest[i] messageDigest[i] = digest[i]
cursor = some(HistoryCursor( cursor = some(HistoryCursor(
pubsubTopic: pubsubTopic, pubsubTopic: pubsubTopic,
senderTime: message.timestamp, senderTime: message.timestamp,
storeTime: storeTimestamp, storeTime: storeTimestamp,
digest: MessageDigest(data: messageDigest) digest: MessageDigest(data: messageDigest)
@ -198,7 +196,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
ok(HistoryResponse( ok(HistoryResponse(
messages: messages, messages: messages,
pageSize: uint64(messages.len), pageSize: uint64(messages.len),
ascending: qAscendingOrder, ascending: qAscendingOrder,
cursor: cursor cursor: cursor
@ -207,8 +205,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResult {.gcsafe.} =
## Protocol ## Protocol
proc initProtocolHandler*(ws: WakuStore) = proc initProtocolHandler(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(MaxRpcSize.int) let buf = await conn.readLp(MaxRpcSize.int)
@ -233,9 +231,9 @@ proc initProtocolHandler*(ws: WakuStore) =
waku_store_queries.inc() waku_store_queries.inc()
if ws.store.isNil(): if ws.store.isNil():
let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE) let respErr = HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE)
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr
let resp = HistoryResponseRPC(error: respErr.toRPC()) let resp = HistoryResponseRPC(error: respErr.toRPC())
@ -245,7 +243,7 @@ proc initProtocolHandler*(ws: WakuStore) =
let query = reqRpc.query.get().toAPI() let query = reqRpc.query.get().toAPI()
let respRes = ws.findMessages(query) let respRes = ws.findMessages(query)
if respRes.isErr(): if respRes.isErr():
@ -259,16 +257,6 @@ proc initProtocolHandler*(ws: WakuStore) =
let resp = respRes.toRPC() let resp = respRes.toRPC()
if not ws.wakuSwap.isNil():
info "handle store swap", peerId=conn.peerId, requestId=reqRpc.requestId, text=ws.wakuSwap.text
# Perform accounting operation
# TODO: Do accounting here, response is HistoryResponseRPC. How do we get node or swap context?
let peerId = conn.peerId
let messages = resp.messages
ws.wakuSwap.credit(peerId, messages.len)
info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp)) let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
@ -277,17 +265,15 @@ proc initProtocolHandler*(ws: WakuStore) =
ws.handler = handler ws.handler = handler
ws.codec = WakuStoreCodec ws.codec = WakuStoreCodec
proc new*(T: type WakuStore, proc new*(T: type WakuStore,
peerManager: PeerManager, peerManager: PeerManager,
rng: ref rand.HmacDrbgContext, rng: ref rand.HmacDrbgContext,
store: MessageStore, store: MessageStore,
wakuSwap: WakuSwap = nil,
retentionPolicy=none(MessageRetentionPolicy)): T = retentionPolicy=none(MessageRetentionPolicy)): T =
let ws = WakuStore( let ws = WakuStore(
rng: rng, rng: rng,
peerManager: peerManager, peerManager: peerManager,
store: store, store: store,
wakuSwap: wakuSwap,
retentionPolicy: retentionPolicy retentionPolicy: retentionPolicy
) )
ws.initProtocolHandler() ws.initProtocolHandler()