Filter out duplicate messages in the resume procedure (#600)

* adds timestamp to waku message store impl

* stores timestamp as int64

* adds untitest

* stores timestamp as seq of bytes

* minor

* re-orders unittest

* changes receiver timestamp to float64

* unit test for receiver timestamps

* adds comments

* reorder a few lines

* updates changelog

* more updates on changelog

* WIP: deduplicate

* add debug messages

* more debugging logs

* add timestamp to the message store

* converts resume result to uint64, removes some logs

* docstring for isDuplicate

* increments `added` before insertion into the db

* unit test for deduplication

* removes float to/from Bytes procs

* deletes extra spaces

* returns failed instead of the error

* minor

* minor

* adds changelog
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-06-21 20:30:12 -07:00 committed by GitHub
parent c5da7c8fcd
commit 30a7a7a4fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 102 additions and 9 deletions

View File

@ -7,6 +7,8 @@ This release contains the following:
### Changes ### Changes
- Enables db migration for the message store. - Enables db migration for the message store.
- The `resume` Nim API eliminates duplicates messages before storing them.
#### General refactoring #### General refactoring
#### Docs #### Docs
#### Schema #### Schema
@ -17,7 +19,6 @@ This release contains the following:
### Fixes ### Fixes
## 2021-06-03 v0.4 ## 2021-06-03 v0.4
This release contains the following: This release contains the following:

View File

@ -12,6 +12,8 @@ import
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
eth/keys, eth/keys,
../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], ../../waku/v2/protocol/[waku_relay, waku_message, message_notifier],
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
@ -699,8 +701,6 @@ procSuite "WakuNode":
contentTopic = ContentTopic("/waku/2/default-content/proto") contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]()
await node1.start() await node1.start()
node1.mountStore(persistMessages = true) node1.mountStore(persistMessages = true)
await node2.start() await node2.start()
@ -720,3 +720,64 @@ procSuite "WakuNode":
await node1.stop() await node1.stop()
await node2.stop() await node2.stop()
asyncTest "Resume proc discards duplicate messages":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
contentTopic = ContentTopic("/waku/2/default-content/proto")
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic)
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic)
# setup sqlite database for node1
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
var completionFut = newFuture[bool]()
await node1.start()
node1.mountStore(persistMessages = true, store = store)
await node2.start()
node2.mountStore(persistMessages = true)
await node2.subscriptions.notify(DefaultTopic, msg1)
await node2.subscriptions.notify(DefaultTopic, msg2)
await sleepAsync(2000.millis)
node1.wakuStore.setPeer(node2.peerInfo)
# populate db with msg1 to be a duplicate
let index1 = computeIndex(msg1)
let output1 = store.put(index1, msg1, DefaultTopic)
check output1.isOk
node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic))
# now run the resume proc
await node1.resume()
# count the total number of retrieved messages from the database
var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
responseCount += 1
# retrieve all the messages in the db
let res = store.getAll(data)
check:
res.isErr == false
check:
# if the duplicates are discarded properly, then the total number of messages after resume should be 2
# check no duplicates is in the messages field
node1.wakuStore.messages.len == 2
# check no duplicates is in the db
responseCount == 2
await node1.stop()
await node2.stop()

View File

@ -27,7 +27,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "../migration/migrations_scrip
type type
WakuMessageStore* = ref object of MessageStore WakuMessageStore* = ref object of MessageStore
database*: SqliteDatabase database*: SqliteDatabase
proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] = proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] =
## Table is the SQL query for creating the messages Table. ## Table is the SQL query for creating the messages Table.
## It contains: ## It contains:

View File

@ -386,6 +386,7 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
warn "failed to load messages from store", err = res.error warn "failed to load messages from store", err = res.error
waku_store_errors.inc(labelValues = ["store_load_failure"]) waku_store_errors.inc(labelValues = ["store_load_failure"])
debug "the number of messages in the memory", messageNum=ws.messages.len
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
@ -486,7 +487,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response) handler(response.value.response)
return ok(response.value.response.messages.len.int64) return ok(response.value.response.messages.len.uint64)
@ -495,7 +496,7 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, can
## returns the number of retrieved messages, or error if all the requests fail ## returns the number of retrieved messages, or error if all the requests fail
for peer in candidateList.items: for peer in candidateList.items:
let successResult = await w.queryFrom(query, handler, peer) let successResult = await w.queryFrom(query, handler, peer)
if successResult.isOk: return ok(successResult.value) if successResult.isOk: return ok(successResult.value.uint64)
debug "failed to resolve the query" debug "failed to resolve the query"
return err("failed to resolve the query") return err("failed to resolve the query")
@ -507,6 +508,14 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
lastSeenTime = iwmsg.msg.timestamp lastSeenTime = iwmsg.msg.timestamp
return lastSeenTime return lastSeenTime
proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
## return true if a duplicate message is found, otherwise false
# it is defined as a separate proc to be bale to adjust comparison criteria
# e.g., to exclude timestamp or include pubsub topic
if message in list: return true
return false
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} = proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db ## messages are stored in the store node's messages field and in the message db
@ -528,17 +537,39 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
lastSeenTime = max(lastSeenTime - offset, 0) lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
var dismissed: uint = 0
var added: uint = 0
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} = proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
debug "resume handler is called"
# exclude index from the comparison criteria
let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg)
for msg in response.messages: for msg in response.messages:
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if isDuplicate(msg,currentMsgSummary):
dismissed = dismissed + 1
continue
# store the new message
let index = msg.computeIndex() let index = msg.computeIndex()
ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)) let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)
ws.messages.add(indexedWakuMsg)
waku_store_messages.inc(labelValues = ["stored"]) waku_store_messages.inc(labelValues = ["stored"])
added = added + 1
# store in db if exists
if ws.store.isNil: continue if ws.store.isNil: continue
let res = ws.store.put(index, msg, DefaultTopic) let res = ws.store.put(index, msg, DefaultTopic)
if res.isErr: if res.isErr:
warn "failed to store messages", err = res.error warn "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"]) waku_store_errors.inc(labelValues = ["store_failure"])
debug "number of duplicate messages found in resume", dismissed=dismissed
debug "number of messages added via resume", added=added
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
if peerList.isSome: if peerList.isSome:
@ -565,7 +596,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
debug "failed to resume the history" debug "failed to resume the history"
return err("failed to resume the history") return err("failed to resume the history")
debug "resume is done successfully" debug "resume is done successfully"
return ok(successResult.value) return ok(added)
# NOTE: Experimental, maybe incorporate as part of query call # NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =

View File

@ -58,7 +58,7 @@ type
query*: HistoryQuery query*: HistoryQuery
response*: HistoryResponse response*: HistoryResponse
QueryResult* = Result[int64, string] QueryResult* = Result[uint64, string]
WakuStore* = ref object of LPProtocol WakuStore* = ref object of LPProtocol
peerManager*: PeerManager peerManager*: PeerManager