refactor(waku-store): deprecate waku store callback methods

This commit is contained in:
Lorenzo Delgado 2022-08-02 13:14:13 +02:00 committed by Lorenzo Delgado
parent 2c4730c850
commit 7a30e485ec
2 changed files with 712 additions and 821 deletions

File diff suppressed because it is too large Load Diff

View File

@ -54,6 +54,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64
# Error types (metric label values)
const
storeFailure = "store_failure"
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
peerNotFoundFailure = "peer_not_found_failure"
@ -202,8 +203,9 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
if (not w.persistMessages):
if not w.persistMessages:
# Store is mounted but new messages should not be stored
return
@ -220,10 +222,10 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr:
if addRes.isErr():
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())])
return # Do not attempt to store in persistent DB
return
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
@ -231,16 +233,37 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
return
let res = w.store.put(index, msg, topic)
if res.isErr:
trace "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
if res.isErr():
trace "failed to store messages", err=res.error()
waku_store_errors.inc(labelValues = [storeFailure])
# TODO: Remove after converting the query method into a non-callback method
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
proc query(w: WakuStore, req: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
if connOpt.isNone():
waku_store_errors.inc(labelValues = [dialFailure])
return err(dialFailure)
let connection = connOpt.get()
let rpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req)
await connection.writeLP(rpc.encode().buffer)
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
if response.isErr():
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return err(decodeRpcFailure)
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
return ok(response.value.response)
proc query*(w: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
@ -248,123 +271,98 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
# - default store peer?
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await w.query(req, peerOpt.get())
proc query*(w: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe,
deprecated: "Use the no-callback version of this method".} =
let response = await w.query(req)
if response.isErr():
error "history query failed", error=response.error()
return
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
handler(response.get())
## 21/WAKU2-FAULT-TOLERANT-STORE
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
## sends the query to the given peer
## returns the number of retrieved messages if no error occurs, otherwise returns the error string
# TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
proc queryFrom*(w: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe,
deprecated: "Use the query() no-callback procedure instead".} =
## Sends the query to the given peer. Returns the number of retrieved messages if no error occurs, otherwise returns the error string
# TODO: dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
let res = await w.query(req, peer)
if res.isErr():
return err(res.error())
let response = res.get()
if connOpt.isNone():
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return err("failed to connect to remote peer")
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
debug "query is sent", query=query
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
debug "response is received"
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return err("failed to decode response")
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
return ok(response.value.response.messages.len.uint64)
handler(response)
return ok(response.messages.len.uint64)
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## a thin wrapper for queryFrom
## sends the query to the given peer
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
## returns all the fetched messages if no error occurs, otherwise returns an error string
debug "queryFromWithPaging is called"
var messageList: seq[WakuMessage]
# make a copy of the query
var q = query
debug "query is", q=q
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages.
## Returns all the fetched messages, if error occurs, returns an error string
var hasNextPage = true
proc handler(response: HistoryResponse) {.gcsafe.} =
# store messages
for m in response.messages.items: messageList.add(m)
# check whether it is the last page
hasNextPage = (response.pagingInfo.pageSize != 0)
debug "hasNextPage", hasNextPage=hasNextPage
# Make a copy of the query
var req = query
# update paging cursor
q.pagingInfo.cursor = response.pagingInfo.cursor
debug "next paging info", pagingInfo=q.pagingInfo
var messageList: seq[WakuMessage] = @[]
# fetch the history in pages
while (hasNextPage):
let successResult = await w.queryFrom(q, handler, peer)
if not successResult.isOk: return err("failed to resolve the query")
debug "hasNextPage", hasNextPage=hasNextPage
# Fetch the history in pages
while true:
let res = await w.query(req, peer)
if res.isErr():
return err(res.error())
let response = res.get()
messageList.add(response.messages)
# Check whether it is the last page
if response.pagingInfo.pageSize == 0:
break
# Update paging cursor
req.pagingInfo.cursor = response.pagingInfo.cursor
return ok(messageList)
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## loops through the candidateList in order and sends the query to each
## once all responses have been received, the retrieved messages are consolidated into one deduplicated list
## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq.
var futureList: seq[Future[WakuStoreResult[seq[WakuMessage]]]]
for peer in candidateList.items:
futureList.add(w.queryFromWithPaging(query, peer))
await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated
proc queryLoop(w: WakuStore, req: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## Loops through the peers candidate list in order and sends the query to each
##
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq.
let queriesList = candidateList.mapIt(w.queryFromWithPaging(req, it))
let messagesList = futureList
await allFutures(queriesList)
let messagesList = queriesList
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures()
fut.read().value
else:
@[]
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
if not fut.completed() or fut.read().isErr():
return @[]
fut.read().value
)
.concat()
.deduplicate()
if messagesList.len != 0:
return ok(messagesList.deduplicate())
else:
debug "failed to resolve the query"
if messagesList.len == 0:
return err("failed to resolve the query")
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
return ok(messagesList)
proc resume*(w: WakuStore,
peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]),
pageSize: uint64 = DefaultPageSize,
pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
@ -375,134 +373,126 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
var lastSeenTime = Timestamp(0)
var currentTime = getNanosecondTime(epochTime())
debug "resume", currentEpochTime=currentTime
let lastSeenItem = ws.messages.last()
let lastSeenItem = w.messages.last()
if lastSeenItem.isOk():
lastSeenTime = lastSeenItem.get().msg.timestamp
var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp
else: Timestamp(0)
# adjust the time window with an offset of 20 seconds
let offset: Timestamp = getNanosecondTime(20)
currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
let
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo)
var dismissed: uint = 0
var added: uint = 0
proc save(msgList: seq[WakuMessage]) =
debug "save proc is called"
# exclude index from the comparison criteria
let req = HistoryQuery(
pubsubTopic: pubsubTopic,
startTime: lastSeenTime,
endTime: currentTime,
pagingInfo: PagingInfo(
direction:PagingDirection.FORWARD,
pageSize: pageSize
)
)
for msg in msgList:
let index = Index.compute(
msg,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = DefaultTopic
)
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if ws.messages.contains(index):
dismissed = dismissed + 1
continue
# store the new message
let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)
# store in db if exists
if not ws.store.isNil:
let res = ws.store.put(index, msg, DefaultTopic)
if res.isErr:
trace "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
continue
discard ws.messages.add(indexedWakuMsg)
added = added + 1
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
debug "number of duplicate messages found in resume", dismissed=dismissed
debug "number of messages added via resume", added=added
if peerList.isSome:
var res: WakuStoreResult[seq[WakuMessage]]
if peerList.isSome():
debug "trying the candidate list to fetch the history"
let successResult = await ws.queryLoop(rpc, peerList.get())
if successResult.isErr:
debug "failed to resume the history from the list of candidates"
return err("failed to resume the history from the list of candidates")
debug "resume is done successfully"
save(successResult.value)
return ok(added)
res = await w.queryLoop(req, peerList.get())
else:
debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
warn "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err("no suitable remote peers")
debug "a peer is selected from peer manager"
let remotePeerInfo = peerOpt.get()
let successResult = await ws.queryFromWithPaging(rpc, remotePeerInfo)
if successResult.isErr:
debug "failed to resume the history"
return err("failed to resume the history")
debug "resume is done successfully"
save(successResult.value)
return ok(added)
res = await w.queryFromWithPaging(req, peerOpt.get())
if res.isErr():
debug "failed to resume the history"
return err("failed to resume the history")
# Save the retrieved messages in the store
var dismissed: uint = 0
var added: uint = 0
for msg in res.get():
let index = Index.compute(
msg,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = pubsubTopic
)
# check for duplicate messages
# TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if w.messages.contains(index):
dismissed.inc()
continue
# store the new message
let resPut = w.messages.put(index, msg, pubsubTopic)
if resPut.isErr():
trace "failed to store messages", err = resPut.error()
waku_store_errors.inc(labelValues = [storeFailure])
continue
# store in db if exists
if not w.store.isNil():
let resPut = w.store.put(index, msg, pubsubTopic)
if resPut.isErr():
trace "failed to store messages", err = resPut.error()
waku_store_errors.inc(labelValues = [storeFailure])
continue
added.inc()
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed
return ok(added)
## EXPERIMENTAL
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
let res = await ws.query(req, peerOpt.get())
if res.isErr():
return err(res.error())
let response = res.get()
# Perform accounting operation. Assumes wakuSwap protocol is mounted
ws.wakuSwap.debit(peerOpt.get().peerId, response.messages.len)
return ok(response)
proc queryWithAccounting*(ws: WakuStore, req: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe,
deprecated: "Use the no-callback procedure instead".} =
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
let response = await ws.queryWithAccounting(req)
if response.isErr():
error "history query failed", error=response.error()
return
let connOpt = await ws.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
if connOpt.isNone():
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(ws.rng),
query: query).encode().buffer)
var message = await connOpt.get().readLp(MaxRpcSize.int)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return
# NOTE Perform accounting operation
# Assumes wakuSwap protocol is mounted
let remotePeerInfo = peerOpt.get()
let messages = response.value.response.messages
ws.wakuSwap.debit(remotePeerInfo.peerId, messages.len)
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
handler(response.get())