mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-25 14:19:31 +00:00
243 lines
8.8 KiB
Nim
243 lines
8.8 KiB
Nim
|
when (NimMajor, NimMinor) < (1, 4):
|
||
|
{.push raises: [Defect].}
|
||
|
else:
|
||
|
{.push raises: [].}
|
||
|
|
||
|
import std/options, stew/results, chronicles, chronos, metrics, bearssl/rand
|
||
|
import
|
||
|
../node/peer_manager,
|
||
|
../utils/requests,
|
||
|
./protocol_metrics,
|
||
|
./common,
|
||
|
./rpc,
|
||
|
./rpc_codec
|
||
|
|
||
|
when defined(waku_exp_store_resume):
|
||
|
import std/[sequtils, times]
|
||
|
import ../waku_archive
|
||
|
import ../waku_core/message/digest
|
||
|
|
||
|
logScope:
|
||
|
topics = "waku legacy store client"
|
||
|
|
||
|
const DefaultPageSize*: uint = 20
|
||
|
# A recommended default number of waku messages per page
|
||
|
|
||
|
type WakuStoreClient* = ref object
|
||
|
peerManager: PeerManager
|
||
|
rng: ref rand.HmacDrbgContext
|
||
|
|
||
|
# TODO: Move outside of the client
|
||
|
when defined(waku_exp_store_resume):
|
||
|
store: ArchiveDriver
|
||
|
|
||
|
proc new*(
|
||
|
T: type WakuStoreClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
|
||
|
): T =
|
||
|
WakuStoreClient(peerManager: peerManager, rng: rng)
|
||
|
|
||
|
proc sendHistoryQueryRPC(
|
||
|
w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo
|
||
|
): Future[HistoryResult] {.async, gcsafe.} =
|
||
|
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
|
||
|
if connOpt.isNone():
|
||
|
waku_legacy_store_errors.inc(labelValues = [dialFailure])
|
||
|
return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer))
|
||
|
|
||
|
let connection = connOpt.get()
|
||
|
|
||
|
let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC()))
|
||
|
await connection.writeLP(reqRpc.encode().buffer)
|
||
|
|
||
|
#TODO: I see a challenge here, if storeNode uses a different MaxRPCSize this read will fail.
|
||
|
# Need to find a workaround for this.
|
||
|
let buf = await connection.readLp(DefaultMaxRpcSize.int)
|
||
|
let respDecodeRes = HistoryRPC.decode(buf)
|
||
|
if respDecodeRes.isErr():
|
||
|
waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure])
|
||
|
return
|
||
|
err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: decodeRpcFailure))
|
||
|
|
||
|
let respRpc = respDecodeRes.get()
|
||
|
|
||
|
# Disabled ,for now, since the default response is a possible case (no messages, pagesize = 0, error = NONE(0))
|
||
|
# TODO: Rework the RPC protocol to differentiate the default value from an empty value (e.g., status = 200 (OK))
|
||
|
# and rework the protobuf parsing to return Option[T] when empty values are received
|
||
|
if respRpc.response.isNone():
|
||
|
waku_legacy_store_errors.inc(labelValues = [emptyRpcResponseFailure])
|
||
|
return err(
|
||
|
HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure)
|
||
|
)
|
||
|
|
||
|
let resp = respRpc.response.get()
|
||
|
|
||
|
return resp.toAPI()
|
||
|
|
||
|
proc query*(
|
||
|
w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo
|
||
|
): Future[HistoryResult] {.async, gcsafe.} =
|
||
|
return await w.sendHistoryQueryRPC(req, peer)
|
||
|
|
||
|
# TODO: Move outside of the client
|
||
|
when defined(waku_exp_store_resume):
|
||
|
## Resume store
|
||
|
|
||
|
const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20)
|
||
|
## Adjust the time window with an offset of 20 seconds
|
||
|
|
||
|
proc new*(
|
||
|
T: type WakuStoreClient,
|
||
|
peerManager: PeerManager,
|
||
|
rng: ref rand.HmacDrbgContext,
|
||
|
store: ArchiveDriver,
|
||
|
): T =
|
||
|
WakuStoreClient(peerManager: peerManager, rng: rng, store: store)
|
||
|
|
||
|
proc queryAll(
|
||
|
w: WakuStoreClient, query: HistoryQuery, peer: RemotePeerInfo
|
||
|
): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||
|
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
|
||
|
## it retrieves the historical messages in pages.
|
||
|
## Returns all the fetched messages, if error occurs, returns an error string
|
||
|
|
||
|
# Make a copy of the query
|
||
|
var req = query
|
||
|
|
||
|
var messageList: seq[WakuMessage] = @[]
|
||
|
|
||
|
while true:
|
||
|
let queryRes = await w.query(req, peer)
|
||
|
if queryRes.isErr():
|
||
|
return err($queryRes.error)
|
||
|
|
||
|
let response = queryRes.get()
|
||
|
|
||
|
messageList.add(response.messages)
|
||
|
|
||
|
# Check whether it is the last page
|
||
|
if response.cursor.isNone():
|
||
|
break
|
||
|
|
||
|
# Update paging cursor
|
||
|
req.cursor = response.cursor
|
||
|
|
||
|
return ok(messageList)
|
||
|
|
||
|
proc queryLoop(
|
||
|
w: WakuStoreClient, req: HistoryQuery, peers: seq[RemotePeerInfo]
|
||
|
): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
|
||
|
## Loops through the peers candidate list in order and sends the query to each
|
||
|
##
|
||
|
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
|
||
|
## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq.
|
||
|
let queryFuturesList = peers.mapIt(w.queryAll(req, it))
|
||
|
|
||
|
await allFutures(queryFuturesList)
|
||
|
|
||
|
let messagesList = queryFuturesList
|
||
|
.map(
|
||
|
proc(fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
|
||
|
try:
|
||
|
# fut.read() can raise a CatchableError
|
||
|
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
|
||
|
if not fut.completed() or fut.read().isErr():
|
||
|
return @[]
|
||
|
|
||
|
fut.read().value
|
||
|
except CatchableError:
|
||
|
return @[]
|
||
|
)
|
||
|
.concat()
|
||
|
.deduplicate()
|
||
|
|
||
|
return ok(messagesList)
|
||
|
|
||
|
proc put(
|
||
|
store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage
|
||
|
): Result[void, string] =
|
||
|
let
|
||
|
digest = waku_archive.computeDigest(message)
|
||
|
messageHash = computeMessageHash(pubsubTopic, message)
|
||
|
receivedTime =
|
||
|
if message.timestamp > 0:
|
||
|
message.timestamp
|
||
|
else:
|
||
|
getNanosecondTime(getTime().toUnixFloat())
|
||
|
|
||
|
store.put(pubsubTopic, message, digest, messageHash, receivedTime)
|
||
|
|
||
|
proc resume*(
|
||
|
w: WakuStoreClient,
|
||
|
peerList = none(seq[RemotePeerInfo]),
|
||
|
pageSize = DefaultPageSize,
|
||
|
pubsubTopic = DefaultPubsubTopic,
|
||
|
): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
||
|
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||
|
## messages are stored in the store node's messages field and in the message db
|
||
|
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||
|
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||
|
## peerList indicates the list of peers to query from.
|
||
|
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
|
||
|
## Such candidates should be found through a discovery method (to be developed).
|
||
|
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||
|
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||
|
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||
|
|
||
|
# If store has not been provided, don't even try
|
||
|
if w.store.isNil():
|
||
|
return err("store not provided (nil)")
|
||
|
|
||
|
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
|
||
|
# of writing, the sqlite store implementation returns the last message's receiver
|
||
|
# timestamp.
|
||
|
# lastSeenTime = lastSeenItem.get().msg.timestamp
|
||
|
let
|
||
|
lastSeenTime = w.store.getNewestMessageTimestamp().get(Timestamp(0))
|
||
|
now = getNanosecondTime(getTime().toUnixFloat())
|
||
|
|
||
|
debug "resuming with offline time window",
|
||
|
lastSeenTime = lastSeenTime, currentTime = now
|
||
|
|
||
|
let
|
||
|
queryEndTime = now + StoreResumeTimeWindowOffset
|
||
|
queryStartTime = max(lastSeenTime - StoreResumeTimeWindowOffset, 0)
|
||
|
|
||
|
let req = HistoryQuery(
|
||
|
pubsubTopic: some(pubsubTopic),
|
||
|
startTime: some(queryStartTime),
|
||
|
endTime: some(queryEndTime),
|
||
|
pageSize: uint64(pageSize),
|
||
|
direction: default(),
|
||
|
)
|
||
|
|
||
|
var res: WakuStoreResult[seq[WakuMessage]]
|
||
|
if peerList.isSome():
|
||
|
debug "trying the candidate list to fetch the history"
|
||
|
res = await w.queryLoop(req, peerList.get())
|
||
|
else:
|
||
|
debug "no candidate list is provided, selecting a random peer"
|
||
|
# if no peerList is set then query from one of the peers stored in the peer manager
|
||
|
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
|
||
|
if peerOpt.isNone():
|
||
|
warn "no suitable remote peers"
|
||
|
waku_legacy_store_errors.inc(labelValues = [peerNotFoundFailure])
|
||
|
return err("no suitable remote peers")
|
||
|
|
||
|
debug "a peer is selected from peer manager"
|
||
|
res = await w.queryAll(req, peerOpt.get())
|
||
|
|
||
|
if res.isErr():
|
||
|
debug "failed to resume the history"
|
||
|
return err("failed to resume the history")
|
||
|
|
||
|
# Save the retrieved messages in the store
|
||
|
var added: uint = 0
|
||
|
for msg in res.get():
|
||
|
let putStoreRes = w.store.put(pubsubTopic, msg)
|
||
|
if putStoreRes.isErr():
|
||
|
continue
|
||
|
|
||
|
added.inc()
|
||
|
|
||
|
return ok(added)
|