mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-25 05:15:53 +00:00
result aggregation in resume API (#816)
This commit is contained in:
parent
81f89b5bc1
commit
b3a7722297
@ -20,6 +20,7 @@ The full list of changes is below.
|
|||||||
- Removed cached `peerInfo` on local node. Rely on underlying libp2p switch instead
|
- Removed cached `peerInfo` on local node. Rely on underlying libp2p switch instead
|
||||||
- Metrics: added counters for protocol messages
|
- Metrics: added counters for protocol messages
|
||||||
- Waku v2 node discovery now supports [`31/WAKU2-ENR`](https://rfc.vac.dev/spec/31/)
|
- Waku v2 node discovery now supports [`31/WAKU2-ENR`](https://rfc.vac.dev/spec/31/)
|
||||||
|
- resuming the history via `resume` now takes the answers of all peers in `peerList` into consideration and consolidates them into one deduplicated list
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
|
||||||
|
@ -79,7 +79,8 @@ proc info*(node: WakuNode): WakuInfo =
|
|||||||
proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]]) =
|
proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]]) =
|
||||||
## Retrieves and persists the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online.
|
## Retrieves and persists the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online.
|
||||||
## It requires the waku node to have the store protocol mounted in the full mode (i.e., persisting messages).
|
## It requires the waku node to have the store protocol mounted in the full mode (i.e., persisting messages).
|
||||||
## `peerList` indicates the list of peers to query from. The history is fetched from the first available peer in this list.
|
## `peerList` indicates the list of peers to query from.
|
||||||
|
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
|
||||||
## If no peerList is passed, the history is fetched from one of the known peers.
|
## If no peerList is passed, the history is fetched from one of the known peers.
|
||||||
## It retrieves the history successfully given that the dialed peer has been online during the queried time window.
|
## It retrieves the history successfully given that the dialed peer has been online during the queried time window.
|
||||||
##
|
##
|
||||||
|
@ -534,6 +534,8 @@ procSuite "Waku Store":
|
|||||||
let
|
let
|
||||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
peer = PeerInfo.new(key)
|
peer = PeerInfo.new(key)
|
||||||
|
key2 = PrivateKey.random(ECDSA, rng[]).get()
|
||||||
|
# peer2 = PeerInfo.new(key2)
|
||||||
var
|
var
|
||||||
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
|
||||||
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
|
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
|
||||||
@ -545,7 +547,19 @@ procSuite "Waku Store":
|
|||||||
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
|
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
|
||||||
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
|
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
|
||||||
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]
|
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]
|
||||||
|
|
||||||
|
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
|
||||||
|
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)),
|
||||||
|
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)),
|
||||||
|
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
|
||||||
|
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
|
||||||
|
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
|
||||||
|
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)),
|
||||||
|
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))]
|
||||||
|
|
||||||
|
#--------------------
|
||||||
|
# setup default test store
|
||||||
|
#--------------------
|
||||||
var dialSwitch = newStandardSwitch()
|
var dialSwitch = newStandardSwitch()
|
||||||
await dialSwitch.start()
|
await dialSwitch.start()
|
||||||
|
|
||||||
@ -562,6 +576,28 @@ procSuite "Waku Store":
|
|||||||
for wakuMsg in msgList:
|
for wakuMsg in msgList:
|
||||||
# the pubsub topic should be DefaultTopic
|
# the pubsub topic should be DefaultTopic
|
||||||
await proto.handleMessage(DefaultTopic, wakuMsg)
|
await proto.handleMessage(DefaultTopic, wakuMsg)
|
||||||
|
|
||||||
|
#--------------------
|
||||||
|
# setup 2nd test store
|
||||||
|
#--------------------
|
||||||
|
var dialSwitch2 = newStandardSwitch()
|
||||||
|
await dialSwitch2.start()
|
||||||
|
|
||||||
|
# to be connected to
|
||||||
|
var listenSwitch2 = newStandardSwitch(some(key2))
|
||||||
|
await listenSwitch2.start()
|
||||||
|
|
||||||
|
let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
|
||||||
|
|
||||||
|
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
listenSwitch2.mount(proto2)
|
||||||
|
|
||||||
|
for wakuMsg in msgList2:
|
||||||
|
# the pubsub topic should be DefaultTopic
|
||||||
|
await proto2.handleMessage(DefaultTopic, wakuMsg)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
asyncTest "handle temporal history query with a valid time window":
|
asyncTest "handle temporal history query with a valid time window":
|
||||||
var completionFut = newFuture[bool]()
|
var completionFut = newFuture[bool]()
|
||||||
@ -679,6 +715,16 @@ procSuite "Waku Store":
|
|||||||
messagesResult.isOk
|
messagesResult.isOk
|
||||||
messagesResult.value.len == 4
|
messagesResult.value.len == 4
|
||||||
|
|
||||||
|
asyncTest "resume history from a list of offline peers":
|
||||||
|
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
|
||||||
|
var dialSwitch3 = newStandardSwitch()
|
||||||
|
await dialSwitch3.start()
|
||||||
|
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
|
||||||
|
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo()]))
|
||||||
|
check:
|
||||||
|
successResult.isErr
|
||||||
|
await dialSwitch3.stop()
|
||||||
|
|
||||||
asyncTest "resume history from a list of candidate peers":
|
asyncTest "resume history from a list of candidate peers":
|
||||||
|
|
||||||
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
|
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
|
||||||
@ -690,12 +736,19 @@ procSuite "Waku Store":
|
|||||||
|
|
||||||
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(),
|
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(),
|
||||||
listenSwitch.peerInfo.toRemotePeerInfo(),
|
listenSwitch.peerInfo.toRemotePeerInfo(),
|
||||||
listenSwitch.peerInfo.toRemotePeerInfo()]))
|
listenSwitch2.peerInfo.toRemotePeerInfo()]))
|
||||||
check:
|
check:
|
||||||
proto3.messages.len == 10
|
# `proto3` is expected to retrieve 14 messages because:
|
||||||
|
# - the store mounted on `listenSwitch` holds 10 messages (`msgList`)
|
||||||
|
# - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`)
|
||||||
|
# - both stores share 3 messages, resulting in 14 unique messages in total
|
||||||
|
proto3.messages.len == 14
|
||||||
successResult.isOk
|
successResult.isOk
|
||||||
successResult.value == 10
|
successResult.value == 14
|
||||||
|
|
||||||
|
await allFutures(dialSwitch.stop(),
|
||||||
|
dialSwitch2.stop())
|
||||||
|
|
||||||
asyncTest "limit store capacity":
|
asyncTest "limit store capacity":
|
||||||
let
|
let
|
||||||
capacity = 10
|
capacity = 10
|
||||||
|
@ -24,7 +24,6 @@ import
|
|||||||
../../utils/requests,
|
../../utils/requests,
|
||||||
../waku_swap/waku_swap,
|
../waku_swap/waku_swap,
|
||||||
./waku_store_types
|
./waku_store_types
|
||||||
|
|
||||||
|
|
||||||
# export all modules whose types are used in public functions/types
|
# export all modules whose types are used in public functions/types
|
||||||
export
|
export
|
||||||
@ -615,14 +614,28 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf
|
|||||||
return ok(messageList)
|
return ok(messageList)
|
||||||
|
|
||||||
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
|
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
|
||||||
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
|
## loops through the candidateList in order and sends the query to each
|
||||||
## returns the retrieved messages, or error if all the requests fail
|
## once all responses have been received, the retrieved messages are consolidated into one deduplicated list
|
||||||
for peer in candidateList.items:
|
## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq.
|
||||||
let successResult = await w.queryFromWithPaging(query, peer)
|
var futureList: seq[Future[MessagesResult]]
|
||||||
if successResult.isOk: return ok(successResult.value)
|
for peer in candidateList.items:
|
||||||
|
futureList.add(w.queryFromWithPaging(query, peer))
|
||||||
|
await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated
|
||||||
|
|
||||||
debug "failed to resolve the query"
|
let messagesList = futureList
|
||||||
return err("failed to resolve the query")
|
.map(proc (fut: Future[MessagesResult]): seq[WakuMessage] =
|
||||||
|
if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures()
|
||||||
|
fut.read().value
|
||||||
|
else:
|
||||||
|
@[]
|
||||||
|
)
|
||||||
|
.concat()
|
||||||
|
|
||||||
|
if messagesList.len != 0:
|
||||||
|
return ok(messagesList.deduplicate())
|
||||||
|
else:
|
||||||
|
debug "failed to resolve the query"
|
||||||
|
return err("failed to resolve the query")
|
||||||
|
|
||||||
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
|
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
|
||||||
var lastSeenTime = float64(0)
|
var lastSeenTime = float64(0)
|
||||||
@ -633,7 +646,7 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
|
|||||||
|
|
||||||
proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
|
proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
|
||||||
## return true if a duplicate message is found, otherwise false
|
## return true if a duplicate message is found, otherwise false
|
||||||
# it is defined as a separate proc to be bale to adjust comparison criteria
|
# it is defined as a separate proc to be able to adjust comparison criteria
|
||||||
# e.g., to exclude timestamp or include pubsub topic
|
# e.g., to exclude timestamp or include pubsub topic
|
||||||
if message in list: return true
|
if message in list: return true
|
||||||
return false
|
return false
|
||||||
@ -643,7 +656,9 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
|
|||||||
## messages are stored in the store node's messages field and in the message db
|
## messages are stored in the store node's messages field and in the message db
|
||||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||||
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||||
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
## peerList indicates the list of peers to query from.
|
||||||
|
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
|
||||||
|
## Such candidates should be found through a discovery method (to be developed).
|
||||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||||
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||||
@ -656,7 +671,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
|
|||||||
let offset: float64 = 200000
|
let offset: float64 = 200000
|
||||||
currentTime = currentTime + offset
|
currentTime = currentTime + offset
|
||||||
lastSeenTime = max(lastSeenTime - offset, 0)
|
lastSeenTime = max(lastSeenTime - offset, 0)
|
||||||
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
|
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
|
||||||
|
|
||||||
let
|
let
|
||||||
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
|
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user