diff --git a/standards/application/relay-reliability.md b/standards/application/relay-reliability.md index 19d614c..9238c2e 100644 --- a/standards/application/relay-reliability.md +++ b/standards/application/relay-reliability.md @@ -71,7 +71,7 @@ def checkOutgoingMessages(): Function `checkOutgoingMessages` is called periodically, most likely every a few seconds. Message hashes can be queried in batch to reduce the number of requests to store nodes, the size in a batch shoud not exceed the max supported size by store node. -The store node can be set and updated directly by application or selected from peer discovery protocol like [discv5](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/33/discv5.md) or [peer exchange](https://github.com/waku-org/specs/blob/master/standards/core/peer-exchange.md). +The store node can be set and updated directly by application or selected from peers which are discovery by protocols like [discv5](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/33/discv5.md) or [peer exchange](https://github.com/waku-org/specs/blob/master/standards/core/peer-exchange.md). The store node may only support specific pubsub topics, and the application should group message hashes by pubsub topics before querying the store node. @@ -79,6 +79,51 @@ When persistent network issue happens, you may not want to resend the failed mes ### Query with Topics and Time Range +An application could use different pubsub topics and content topics, for example a community may have its own pubsub topic, and each channel may have its own content topic. To fetch all missing messages in a specific channel, the application can query the store node with the provided pubsub topic, content topic and time range. + +```python +class FetchRecord: + pubsubTopic: str + lastFetch: int + +class QueryParams: + pubsubTopic: str + contentTopics: List[str] + fromTime: int + toTime: int + +def fetchMissingMessages(queryParams): + missingMessageHashes = [] + + # get missing message identifiers first in order to reduce the data transfer + response = waku.store.queryMessageHashes(queryParams) + for !response.isComplete(): + # process each message in the response + response.messages().forEach(messageHash -> { + message = queryDbMessageByHash(messageHash) + if message.exists(): + continue + } + missingMessageHashes.append(messageHash) + }) + + # process next page of the response + response.Next() + + # fetch missing messages with hashes in batch + response = waku.store.queryMessages(queryParams, missingMessageHashes) + response.messages().forEach(message -> { + processMessage(message) + }) + + updateFetchRecord(queryParams.pubsubTopic, queryParams.toTime) +``` + +`QueryParam` includes all the necessary information to fetch missing messages, it may also include peer id of the store node for easy discovery. The application should iterate all the interested pubsub topics, along with its content topics to construct the `QueryParam`. + +Function `fetchMissingMessages` is runing periocally, for example 1 minute. It first fetch all the message hashes in the specified time range, check if messages all exist in local dabatase, if not, fetch the missing messages in batch. The batch size should be limited to avoid large data transfer or exceed the max supported size by store node. + +When finishing fetching missing messages, the application should update the last fetch time in `FetchRecord`. The last fetch time can be used to calculate the time range for the next fetch and avoid fetching the same messages again. ## Security and Performance Considerations