diff --git a/CHANGELOG.md b/CHANGELOG.md index e3794892f..e6c97cda9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The full list of changes is below. - Removed cached `peerInfo` on local node. Rely on underlying libp2p switch instead - Metrics: added counters for protocol messages - Waku v2 node discovery now supports [`31/WAKU2-ENR`](https://rfc.vac.dev/spec/31/) +- resuming the history via `resume` now takes the answers of all peers in `peerList` into consideration and consolidates them into one deduplicated list ### Fixes diff --git a/docs/api/v2/node.md b/docs/api/v2/node.md index f4e08b7db..8c9006e1b 100644 --- a/docs/api/v2/node.md +++ b/docs/api/v2/node.md @@ -79,7 +79,8 @@ proc info*(node: WakuNode): WakuInfo = proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]]) = ## Retrieves and persists the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online. ## It requires the waku node to have the store protocol mounted in the full mode (i.e., persisting messages). - ## `peerList` indicates the list of peers to query from. The history is fetched from the first available peer in this list. + ## `peerList` indicates the list of peers to query from. + ## The history is fetched from all available peers in this list and then consolidated into one deduplicated list. ## If no peerList is passed, the history is fetched from one of the known peers. ## It retrieves the history successfully given that the dialed peer has been online during the queried time window. ## diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 845d9038e..3b684bf9e 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -534,6 +534,8 @@ procSuite "Waku Store": let key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.new(key) + key2 = PrivateKey.random(ECDSA, rng[]).get() + # peer2 = PeerInfo.new(key2) var msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)), WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)), @@ -545,7 +547,19 @@ procSuite "Waku Store": WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)), WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)), WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))] - + + msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)), + WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)), + WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)), + WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)), + WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)), + WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)), + WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)), + WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))] + + #-------------------- + # setup default test store + #-------------------- var dialSwitch = newStandardSwitch() await dialSwitch.start() @@ -562,6 +576,28 @@ procSuite "Waku Store": for wakuMsg in msgList: # the pubsub topic should be DefaultTopic await proto.handleMessage(DefaultTopic, wakuMsg) + + #-------------------- + # setup 2nd test store + #-------------------- + var dialSwitch2 = newStandardSwitch() + await dialSwitch2.start() + + # to be connected to + var listenSwitch2 = newStandardSwitch(some(key2)) + await listenSwitch2.start() + + let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) + + proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo()) + + listenSwitch2.mount(proto2) + + for wakuMsg in msgList2: + # the pubsub topic should be DefaultTopic + await proto2.handleMessage(DefaultTopic, wakuMsg) + + asyncTest "handle temporal history query with a valid time window": var completionFut = newFuture[bool]() @@ -679,6 +715,16 @@ procSuite "Waku Store": messagesResult.isOk messagesResult.value.len == 4 + asyncTest "resume history from a list of offline peers": + var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get())) + var dialSwitch3 = newStandardSwitch() + await dialSwitch3.start() + let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng()) + let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo()])) + check: + successResult.isErr + await dialSwitch3.stop() + asyncTest "resume history from a list of candidate peers": var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get())) @@ -690,12 +736,19 @@ procSuite "Waku Store": let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(), listenSwitch.peerInfo.toRemotePeerInfo(), - listenSwitch.peerInfo.toRemotePeerInfo()])) + listenSwitch2.peerInfo.toRemotePeerInfo()])) check: - proto3.messages.len == 10 + # `proto3` is expected to retrieve 14 messages because: + # - the store mounted on `listenSwitch` holds 10 messages (`msgList`) + # - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`) + # - both stores share 3 messages, resulting in 14 unique messages in total + proto3.messages.len == 14 successResult.isOk - successResult.value == 10 - + successResult.value == 14 + + await allFutures(dialSwitch.stop(), + dialSwitch2.stop()) + asyncTest "limit store capacity": let capacity = 10 diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 14ac6a8d4..42cb22126 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az272-145: +# Libtool was configured on host fv-az278-37: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index c9294085c..33697f60b 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -24,7 +24,6 @@ import ../../utils/requests, ../waku_swap/waku_swap, ./waku_store_types - # export all modules whose types are used in public functions/types export @@ -615,14 +614,28 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf return ok(messageList) proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} = - ## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully - ## returns the retrieved messages, or error if all the requests fail - for peer in candidateList.items: - let successResult = await w.queryFromWithPaging(query, peer) - if successResult.isOk: return ok(successResult.value) + ## loops through the candidateList in order and sends the query to each + ## once all responses have been received, the retrieved messages are consolidated into one deduplicated list + ## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq. + var futureList: seq[Future[MessagesResult]] + for peer in candidateList.items: + futureList.add(w.queryFromWithPaging(query, peer)) + await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated - debug "failed to resolve the query" - return err("failed to resolve the query") + let messagesList = futureList + .map(proc (fut: Future[MessagesResult]): seq[WakuMessage] = + if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures() + fut.read().value + else: + @[] + ) + .concat() + + if messagesList.len != 0: + return ok(messagesList.deduplicate()) + else: + debug "failed to resolve the query" + return err("failed to resolve the query") proc findLastSeen*(list: seq[IndexedWakuMessage]): float = var lastSeenTime = float64(0) @@ -633,7 +646,7 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float = proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool = ## return true if a duplicate message is found, otherwise false - # it is defined as a separate proc to be bale to adjust comparison criteria + # it is defined as a separate proc to be able to adjust comparison criteria # e.g., to exclude timestamp or include pubsub topic if message in list: return true return false @@ -643,7 +656,9 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## an offset of 20 second is added to the time window to count for nodes asynchrony - ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). + ## peerList indicates the list of peers to query from. + ## The history is fetched from all available peers in this list and then consolidated into one deduplicated list. + ## Such candidates should be found through a discovery method (to be developed). ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string @@ -656,7 +671,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem let offset: float64 = 200000 currentTime = currentTime + offset lastSeenTime = max(lastSeenTime - offset, 0) - debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime + debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime let pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)