feat: message missing messages

This commit is contained in:
kaichaosun 2024-06-27 17:19:42 +08:00
parent a5681f6e70
commit 766e6c4d46
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
1 changed files with 46 additions and 1 deletions

View File

@ -71,7 +71,7 @@ def checkOutgoingMessages():
Function `checkOutgoingMessages` is called periodically, most likely every a few seconds. Message hashes can be queried in batch to reduce the number of requests to store nodes, the size in a batch shoud not exceed the max supported size by store node.
The store node can be set and updated directly by application or selected from peer discovery protocol like [discv5](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/33/discv5.md) or [peer exchange](https://github.com/waku-org/specs/blob/master/standards/core/peer-exchange.md).
The store node can be set and updated directly by application or selected from peers which are discovery by protocols like [discv5](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/33/discv5.md) or [peer exchange](https://github.com/waku-org/specs/blob/master/standards/core/peer-exchange.md).
The store node may only support specific pubsub topics, and the application should group message hashes by pubsub topics before querying the store node.
@ -79,6 +79,51 @@ When persistent network issue happens, you may not want to resend the failed mes
### Query with Topics and Time Range
An application could use different pubsub topics and content topics, for example a community may have its own pubsub topic, and each channel may have its own content topic. To fetch all missing messages in a specific channel, the application can query the store node with the provided pubsub topic, content topic and time range.
```python
class FetchRecord:
pubsubTopic: str
lastFetch: int
class QueryParams:
pubsubTopic: str
contentTopics: List[str]
fromTime: int
toTime: int
def fetchMissingMessages(queryParams):
missingMessageHashes = []
# get missing message identifiers first in order to reduce the data transfer
response = waku.store.queryMessageHashes(queryParams)
for !response.isComplete():
# process each message in the response
response.messages().forEach(messageHash -> {
message = queryDbMessageByHash(messageHash)
if message.exists():
continue
}
missingMessageHashes.append(messageHash)
})
# process next page of the response
response.Next()
# fetch missing messages with hashes in batch
response = waku.store.queryMessages(queryParams, missingMessageHashes)
response.messages().forEach(message -> {
processMessage(message)
})
updateFetchRecord(queryParams.pubsubTopic, queryParams.toTime)
```
`QueryParam` includes all the necessary information to fetch missing messages, it may also include peer id of the store node for easy discovery. The application should iterate all the interested pubsub topics, along with its content topics to construct the `QueryParam`.
Function `fetchMissingMessages` is runing periocally, for example 1 minute. It first fetch all the message hashes in the specified time range, check if messages all exist in local dabatase, if not, fetch the missing messages in batch. The batch size should be limited to avoid large data transfer or exceed the max supported size by store node.
When finishing fetching missing messages, the application should update the last fetch time in `FetchRecord`. The last fetch time can be used to calculate the time range for the next fetch and avoid fetching the same messages again.
## Security and Performance Considerations