mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-15 09:26:38 +00:00
deploy: cd703c6c52bf480213e63413d449879111c1cb35
This commit is contained in:
parent
458caeb59b
commit
dd999a831f
@ -8,6 +8,7 @@ This release contains the following:
|
|||||||
### Changes
|
### Changes
|
||||||
- Enables db migration for the message store.
|
- Enables db migration for the message store.
|
||||||
- The `resume` Nim API eliminates duplicates messages before storing them.
|
- The `resume` Nim API eliminates duplicates messages before storing them.
|
||||||
|
- Updates the `resume` Nim API to fetch historical messages in sequence of pages.
|
||||||
- Support for stable version of `relay` protocol, with protocol ID `/vac/waku/relay/2.0.0`
|
- Support for stable version of `relay` protocol, with protocol ID `/vac/waku/relay/2.0.0`
|
||||||
- Support for multiple protocol IDs - now matches any protocol that adds postfix to stable ID.
|
- Support for multiple protocol IDs - now matches any protocol that adds postfix to stable ID.
|
||||||
|
|
||||||
|
@ -695,6 +695,25 @@ procSuite "Waku Store":
|
|||||||
successResult.isOk
|
successResult.isOk
|
||||||
successResult.value == 4
|
successResult.value == 4
|
||||||
|
|
||||||
|
asyncTest "queryFromWithPaging with empty pagingInfo":
|
||||||
|
|
||||||
|
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
|
||||||
|
|
||||||
|
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
messagesResult.isOk
|
||||||
|
messagesResult.value.len == 4
|
||||||
|
|
||||||
|
asyncTest "queryFromWithPaging with pagination":
|
||||||
|
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
|
||||||
|
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
|
||||||
|
|
||||||
|
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
messagesResult.isOk
|
||||||
|
messagesResult.value.len == 4
|
||||||
|
|
||||||
asyncTest "resume history from a list of candidate peers":
|
asyncTest "resume history from a list of candidate peers":
|
||||||
|
|
||||||
@ -710,5 +729,3 @@ procSuite "Waku Store":
|
|||||||
proto3.messages.len == 10
|
proto3.messages.len == 10
|
||||||
successResult.isOk
|
successResult.isOk
|
||||||
successResult.value == 10
|
successResult.value == 10
|
||||||
|
|
||||||
|
|
||||||
|
@ -475,10 +475,12 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
|
|||||||
|
|
||||||
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
|
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
|
||||||
query: query).encode().buffer)
|
query: query).encode().buffer)
|
||||||
|
debug "query is sent", query=query
|
||||||
var message = await connOpt.get().readLp(64*1024)
|
var message = await connOpt.get().readLp(64*1024)
|
||||||
let response = HistoryRPC.init(message)
|
let response = HistoryRPC.init(message)
|
||||||
|
|
||||||
|
debug "response is received"
|
||||||
|
|
||||||
if response.isErr:
|
if response.isErr:
|
||||||
error "failed to decode response"
|
error "failed to decode response"
|
||||||
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
@ -489,14 +491,44 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
|
|||||||
handler(response.value.response)
|
handler(response.value.response)
|
||||||
return ok(response.value.response.messages.len.uint64)
|
return ok(response.value.response.messages.len.uint64)
|
||||||
|
|
||||||
|
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Future[MessagesResult] {.async, gcsafe.} =
|
||||||
|
## a thin wrapper for queryFrom
|
||||||
|
## sends the query to the given peer
|
||||||
|
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
|
||||||
|
## returns all the fetched messages if no error occurs, otherwise returns an error string
|
||||||
|
debug "queryFromWithPaging is called"
|
||||||
|
var messageList: seq[WakuMessage]
|
||||||
|
# make a copy of the query
|
||||||
|
var q = query
|
||||||
|
debug "query is", q=q
|
||||||
|
|
||||||
|
var hasNextPage = true
|
||||||
|
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
|
||||||
|
# store messages
|
||||||
|
for m in response.messages.items: messageList.add(m)
|
||||||
|
|
||||||
proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async, gcsafe.}=
|
# check whether it is the last page
|
||||||
|
hasNextPage = (response.pagingInfo.pageSize != 0)
|
||||||
|
debug "hasNextPage", hasNextPage=hasNextPage
|
||||||
|
|
||||||
|
# update paging cursor
|
||||||
|
q.pagingInfo.cursor = response.pagingInfo.cursor
|
||||||
|
debug "next paging info", pagingInfo=q.pagingInfo
|
||||||
|
|
||||||
|
# fetch the history in pages
|
||||||
|
while (hasNextPage):
|
||||||
|
let successResult = await w.queryFrom(q, handler, peer)
|
||||||
|
if not successResult.isOk: return err("failed to resolve the query")
|
||||||
|
debug "hasNextPage", hasNextPage=hasNextPage
|
||||||
|
|
||||||
|
return ok(messageList)
|
||||||
|
|
||||||
|
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[PeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
|
||||||
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
|
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
|
||||||
## returns the number of retrieved messages, or error if all the requests fail
|
## returns the retrieved messages, or error if all the requests fail
|
||||||
for peer in candidateList.items:
|
for peer in candidateList.items:
|
||||||
let successResult = await w.queryFrom(query, handler, peer)
|
let successResult = await w.queryFromWithPaging(query, peer)
|
||||||
if successResult.isOk: return ok(successResult.value.uint64)
|
if successResult.isOk: return ok(successResult.value)
|
||||||
|
|
||||||
debug "failed to resolve the query"
|
debug "failed to resolve the query"
|
||||||
return err("failed to resolve the query")
|
return err("failed to resolve the query")
|
||||||
@ -515,8 +547,7 @@ proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
|
|||||||
if message in list: return true
|
if message in list: return true
|
||||||
return false
|
return false
|
||||||
|
|
||||||
|
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
|
||||||
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} =
|
|
||||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||||
## messages are stored in the store node's messages field and in the message db
|
## messages are stored in the store node's messages field and in the message db
|
||||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||||
@ -526,7 +557,6 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
|||||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||||
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||||
|
|
||||||
# TODO remove duplicate messages from the fetched history
|
|
||||||
var currentTime = epochTime()
|
var currentTime = epochTime()
|
||||||
var lastSeenTime: float = findLastSeen(ws.messages)
|
var lastSeenTime: float = findLastSeen(ws.messages)
|
||||||
debug "resume", currentEpochTime=currentTime
|
debug "resume", currentEpochTime=currentTime
|
||||||
@ -537,13 +567,18 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
|||||||
lastSeenTime = max(lastSeenTime - offset, 0)
|
lastSeenTime = max(lastSeenTime - offset, 0)
|
||||||
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
|
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
|
||||||
|
|
||||||
|
let
|
||||||
|
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
|
||||||
|
rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo)
|
||||||
|
|
||||||
|
|
||||||
var dismissed: uint = 0
|
var dismissed: uint = 0
|
||||||
var added: uint = 0
|
var added: uint = 0
|
||||||
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
|
proc save(msgList: seq[WakuMessage]) {.raises: [Defect, Exception].} =
|
||||||
debug "resume handler is called"
|
debug "save proc is called"
|
||||||
# exclude index from the comparison criteria
|
# exclude index from the comparison criteria
|
||||||
let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg)
|
let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg)
|
||||||
for msg in response.messages:
|
for msg in msgList:
|
||||||
# check for duplicate messages
|
# check for duplicate messages
|
||||||
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
|
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
|
||||||
if isDuplicate(msg,currentMsgSummary):
|
if isDuplicate(msg,currentMsgSummary):
|
||||||
@ -570,17 +605,15 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
|||||||
debug "number of duplicate messages found in resume", dismissed=dismissed
|
debug "number of duplicate messages found in resume", dismissed=dismissed
|
||||||
debug "number of messages added via resume", added=added
|
debug "number of messages added via resume", added=added
|
||||||
|
|
||||||
|
|
||||||
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
|
|
||||||
|
|
||||||
if peerList.isSome:
|
if peerList.isSome:
|
||||||
debug "trying the candidate list to fetch the history"
|
debug "trying the candidate list to fetch the history"
|
||||||
let successResult = await ws.queryLoop(rpc, handler, peerList.get())
|
let successResult = await ws.queryLoop(rpc, peerList.get())
|
||||||
if successResult.isErr:
|
if successResult.isErr:
|
||||||
debug "failed to resume the history from the list of candidates"
|
debug "failed to resume the history from the list of candidates"
|
||||||
return err("failed to resume the history from the list of candidates")
|
return err("failed to resume the history from the list of candidates")
|
||||||
debug "resume is done successfully"
|
debug "resume is done successfully"
|
||||||
return ok(successResult.value)
|
save(successResult.value)
|
||||||
|
return ok(added)
|
||||||
else:
|
else:
|
||||||
debug "no candidate list is provided, selecting a random peer"
|
debug "no candidate list is provided, selecting a random peer"
|
||||||
# if no peerList is set then query from one of the peers stored in the peer manager
|
# if no peerList is set then query from one of the peers stored in the peer manager
|
||||||
@ -592,11 +625,12 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
|||||||
|
|
||||||
debug "a peer is selected from peer manager"
|
debug "a peer is selected from peer manager"
|
||||||
let peerInfo = peerOpt.get()
|
let peerInfo = peerOpt.get()
|
||||||
let successResult = await ws.queryFrom(rpc, handler, peerInfo)
|
let successResult = await ws.queryFromWithPaging(rpc, peerInfo)
|
||||||
if successResult.isErr:
|
if successResult.isErr:
|
||||||
debug "failed to resume the history"
|
debug "failed to resume the history"
|
||||||
return err("failed to resume the history")
|
return err("failed to resume the history")
|
||||||
debug "resume is done successfully"
|
debug "resume is done successfully"
|
||||||
|
save(successResult.value)
|
||||||
return ok(added)
|
return ok(added)
|
||||||
|
|
||||||
# NOTE: Experimental, maybe incorporate as part of query call
|
# NOTE: Experimental, maybe incorporate as part of query call
|
||||||
|
@ -15,6 +15,9 @@ export pagination
|
|||||||
|
|
||||||
# Constants required for pagination -------------------------------------------
|
# Constants required for pagination -------------------------------------------
|
||||||
const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
|
const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
|
||||||
|
# TODO the DefaultPageSize can be changed, it's current value is random
|
||||||
|
const DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
|
||||||
|
|
||||||
const DefaultTopic* = "/waku/2/default-waku/proto"
|
const DefaultTopic* = "/waku/2/default-waku/proto"
|
||||||
|
|
||||||
|
|
||||||
@ -59,6 +62,7 @@ type
|
|||||||
response*: HistoryResponse
|
response*: HistoryResponse
|
||||||
|
|
||||||
QueryResult* = Result[uint64, string]
|
QueryResult* = Result[uint64, string]
|
||||||
|
MessagesResult* = Result[seq[WakuMessage], string]
|
||||||
|
|
||||||
WakuStore* = ref object of LPProtocol
|
WakuStore* = ref object of LPProtocol
|
||||||
peerManager*: PeerManager
|
peerManager*: PeerManager
|
||||||
|
Loading…
x
Reference in New Issue
Block a user