deploy: b3a7722297318ecbd1c8fc94312b435926cb395f

This commit is contained in:
kaiserd 2022-01-18 22:34:04 +00:00
parent b6f0c23030
commit 9e102a0c33
5 changed files with 88 additions and 18 deletions

View File

@ -20,6 +20,7 @@ The full list of changes is below.
- Removed cached `peerInfo` on local node. Rely on underlying libp2p switch instead
- Metrics: added counters for protocol messages
- Waku v2 node discovery now supports [`31/WAKU2-ENR`](https://rfc.vac.dev/spec/31/)
- resuming the history via `resume` now takes the answers of all peers in `peerList` into consideration and consolidates them into one deduplicated list
### Fixes

View File

@ -79,7 +79,8 @@ proc info*(node: WakuNode): WakuInfo =
proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]]) =
## Retrieves and persists the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online.
## It requires the waku node to have the store protocol mounted in the full mode (i.e., persisting messages).
## `peerList` indicates the list of peers to query from. The history is fetched from the first available peer in this list.
## `peerList` indicates the list of peers to query from.
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
## If no peerList is passed, the history is fetched from one of the known peers.
## It retrieves the history successfully given that the dialed peer has been online during the queried time window.
##

View File

@ -534,6 +534,8 @@ procSuite "Waku Store":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
@ -545,7 +547,19 @@ procSuite "Waku Store":
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))]
#--------------------
# setup default test store
#--------------------
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
@ -562,6 +576,28 @@ procSuite "Waku Store":
for wakuMsg in msgList:
# the pubsub topic should be DefaultTopic
await proto.handleMessage(DefaultTopic, wakuMsg)
#--------------------
# setup 2nd test store
#--------------------
var dialSwitch2 = newStandardSwitch()
await dialSwitch2.start()
# to be connected to
var listenSwitch2 = newStandardSwitch(some(key2))
await listenSwitch2.start()
let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
listenSwitch2.mount(proto2)
for wakuMsg in msgList2:
# the pubsub topic should be DefaultTopic
await proto2.handleMessage(DefaultTopic, wakuMsg)
asyncTest "handle temporal history query with a valid time window":
var completionFut = newFuture[bool]()
@ -679,6 +715,16 @@ procSuite "Waku Store":
messagesResult.isOk
messagesResult.value.len == 4
asyncTest "resume history from a list of offline peers":
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
var dialSwitch3 = newStandardSwitch()
await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo()]))
check:
successResult.isErr
await dialSwitch3.stop()
asyncTest "resume history from a list of candidate peers":
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
@ -690,12 +736,19 @@ procSuite "Waku Store":
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(),
listenSwitch.peerInfo.toRemotePeerInfo(),
listenSwitch.peerInfo.toRemotePeerInfo()]))
listenSwitch2.peerInfo.toRemotePeerInfo()]))
check:
proto3.messages.len == 10
# `proto3` is expected to retrieve 14 messages because:
# - the store mounted on `listenSwitch` holds 10 messages (`msgList`)
# - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`)
# - both stores share 3 messages, resulting in 14 unique messages in total
proto3.messages.len == 14
successResult.isOk
successResult.value == 10
successResult.value == 14
await allFutures(dialSwitch.stop(),
dialSwitch2.stop())
asyncTest "limit store capacity":
let
capacity = 10

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az272-145:
# Libtool was configured on host fv-az278-37:
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
#
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -24,7 +24,6 @@ import
../../utils/requests,
../waku_swap/waku_swap,
./waku_store_types
# export all modules whose types are used in public functions/types
export
@ -615,14 +614,28 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf
return ok(messageList)
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
## returns the retrieved messages, or error if all the requests fail
for peer in candidateList.items:
let successResult = await w.queryFromWithPaging(query, peer)
if successResult.isOk: return ok(successResult.value)
## loops through the candidateList in order and sends the query to each
## once all responses have been received, the retrieved messages are consolidated into one deduplicated list
## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq.
var futureList: seq[Future[MessagesResult]]
for peer in candidateList.items:
futureList.add(w.queryFromWithPaging(query, peer))
await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated
debug "failed to resolve the query"
return err("failed to resolve the query")
let messagesList = futureList
.map(proc (fut: Future[MessagesResult]): seq[WakuMessage] =
if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures()
fut.read().value
else:
@[]
)
.concat()
if messagesList.len != 0:
return ok(messagesList.deduplicate())
else:
debug "failed to resolve the query"
return err("failed to resolve the query")
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
var lastSeenTime = float64(0)
@ -633,7 +646,7 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
## return true if a duplicate message is found, otherwise false
# it is defined as a separate proc to be bale to adjust comparison criteria
# it is defined as a separate proc to be able to adjust comparison criteria
# e.g., to exclude timestamp or include pubsub topic
if message in list: return true
return false
@ -643,7 +656,9 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## peerList indicates the list of peers to query from.
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
## Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
@ -656,7 +671,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
let offset: float64 = 200000
currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
let
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)