chore_: more workflow

This commit is contained in:
kaichaosun 2024-07-01 16:29:31 +08:00
parent 33a17ebe78
commit 5afbb2bbfe
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
1 changed files with 27 additions and 4 deletions

View File

@ -38,6 +38,19 @@ By leveraging the store node to provide such query services, the applications ar
### Query with Message Hash
For outgoing messages, the processing flow can be like this:
- create a buffer for all "outgoing" messages
- send message via relay or lightpush protocol
- add message hash to the buffer
- save the message to local database with status "outgoing"
- check the buffer periodically
- query the store node with message hash in the buffer, the messages should be posted more than a few seconds ago
- if the message exists, update the status to "sent" and remove the message hash from the buffer
- if the message does not exist, resend the message
- if the message is still missing in the store node for a period of time, trigger the message failed to send workflow and remove the message hash from the buffer
The implementation in Python may look like this:
```python
outgoingMessageHashes = []
@ -55,12 +68,12 @@ def send(message):
message.status = 'outgoing'
database.saveMessage(message)
def checkOutgoingMessages():
def checkOutgoingMessages(peerID):
for messageHash in outgoingMessageHashes:
message = database.getMessage(messageHash)
# only query store node for ongoing message, and posted more than 3 seconds ago
if message.status == 'ongoing' && time.now() - message.postTime > 3:
response = waku.store.queryMessage(messageHash)
response = waku.store.queryMessage(peerID, messageHash)
if response.exists():
database.updateMessageStatus(messageHash, 'sent')
outgoingMessageHashes.remove(messageHash)
@ -81,6 +94,16 @@ When persistent network issue happens, you may not want to resend the failed mes
An application could use different pubsub topics and content topics, for example a community may have its own pubsub topic, and each channel may have its own content topic. To fetch all missing messages in a specific channel, the application can query the store node with the provided pubsub topic, content topic and time range.
For incoming messages, the processing flow can be like this:
- subscribe to the interested pubsub and content topics
- query the store node with the interested topics and time range for message hashes periodically
- check if the message hash is exist in local database, if not, adding the message hash to a buffer, if yes, skip the message
- fetch the missing messages in the buffer from store node
- process the messages necessarily
- update the last fetch time for the interested topic
The implementation in Python may look like this:
```python
class FetchRecord:
pubsubTopic: str
@ -92,7 +115,7 @@ class QueryParams:
fromTime: int
toTime: int
def fetchMissingMessages(queryParams):
def fetchMissingMessages(peerID, queryParams):
missingMessageHashes = []
# get missing message identifiers first in order to reduce the data transfer
@ -119,7 +142,7 @@ def fetchMissingMessages(queryParams):
updateFetchRecord(queryParams.pubsubTopic, queryParams.toTime)
```
`QueryParam` includes all the necessary information to fetch missing messages, it may also include peer id of the store node for easy discovery. The application should iterate all the interested pubsub topics, along with its content topics to construct the `QueryParam`.
`QueryParam` includes all the necessary information to fetch missing messages. The application should iterate all the interested pubsub topics, along with its content topics to construct the `QueryParam`.
Function `fetchMissingMessages` is runing periocally, for example 1 minute. It first fetch all the message hashes in the specified time range, check if message exist in local dabatase, if not, fetch the missing messages in batch. The batch size should be bounded to avoid large data transfer or exceed the max supported size by store node.