diff --git a/CHANGELOG.md b/CHANGELOG.md index 67087a4d2..147062314 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ This release contains the following: ### Changes - Enables db migration for the message store. +- The `resume` Nim API eliminates duplicates messages before storing them. + #### General refactoring #### Docs #### Schema @@ -17,7 +19,6 @@ This release contains the following: ### Fixes - ## 2021-06-03 v0.4 This release contains the following: diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 28b87dfe1..0dad6cdca 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -12,6 +12,8 @@ import libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, eth/keys, + ../../waku/v2/node/storage/sqlite, + ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, @@ -699,8 +701,6 @@ procSuite "WakuNode": contentTopic = ContentTopic("/waku/2/default-content/proto") message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - var completionFut = newFuture[bool]() - await node1.start() node1.mountStore(persistMessages = true) await node2.start() @@ -720,3 +720,64 @@ procSuite "WakuNode": await node1.stop() await node2.stop() + + asyncTest "Resume proc discards duplicate messages": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + contentTopic = ContentTopic("/waku/2/default-content/proto") + msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic) + msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic) + + # setup sqlite database for node1 + let + database = SqliteDatabase.init("", inMemory = true)[] + store = WakuMessageStore.init(database)[] + + + var completionFut = newFuture[bool]() + + await node1.start() + node1.mountStore(persistMessages = true, store = store) + await node2.start() + node2.mountStore(persistMessages = true) + + await node2.subscriptions.notify(DefaultTopic, msg1) + await node2.subscriptions.notify(DefaultTopic, msg2) + + await sleepAsync(2000.millis) + + node1.wakuStore.setPeer(node2.peerInfo) + + + # populate db with msg1 to be a duplicate + let index1 = computeIndex(msg1) + let output1 = store.put(index1, msg1, DefaultTopic) + check output1.isOk + node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic)) + + # now run the resume proc + await node1.resume() + + # count the total number of retrieved messages from the database + var responseCount = 0 + proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) = + responseCount += 1 + # retrieve all the messages in the db + let res = store.getAll(data) + check: + res.isErr == false + + check: + # if the duplicates are discarded properly, then the total number of messages after resume should be 2 + # check no duplicates is in the messages field + node1.wakuStore.messages.len == 2 + # check no duplicates is in the db + responseCount == 2 + + await node1.stop() + await node2.stop() \ No newline at end of file diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 3951026fa..f19caa59c 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -27,7 +27,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "../migration/migrations_scrip type WakuMessageStore* = ref object of MessageStore database*: SqliteDatabase - + proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] = ## Table is the SQL query for creating the messages Table. ## It contains: diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 3e9ddad46..c2bc2933a 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -386,6 +386,7 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} = warn "failed to load messages from store", err = res.error waku_store_errors.inc(labelValues = ["store_load_failure"]) + debug "the number of messages in the memory", messageNum=ws.messages.len waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) @@ -486,7 +487,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) - return ok(response.value.response.messages.len.int64) + return ok(response.value.response.messages.len.uint64) @@ -495,7 +496,7 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, can ## returns the number of retrieved messages, or error if all the requests fail for peer in candidateList.items: let successResult = await w.queryFrom(query, handler, peer) - if successResult.isOk: return ok(successResult.value) + if successResult.isOk: return ok(successResult.value.uint64) debug "failed to resolve the query" return err("failed to resolve the query") @@ -507,6 +508,14 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float = lastSeenTime = iwmsg.msg.timestamp return lastSeenTime +proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool = + ## return true if a duplicate message is found, otherwise false + # it is defined as a separate proc to be bale to adjust comparison criteria + # e.g., to exclude timestamp or include pubsub topic + if message in list: return true + return false + + proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db @@ -528,17 +537,39 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] lastSeenTime = max(lastSeenTime - offset, 0) debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime + var dismissed: uint = 0 + var added: uint = 0 proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} = + debug "resume handler is called" + # exclude index from the comparison criteria + let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg) for msg in response.messages: + # check for duplicate messages + # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic + if isDuplicate(msg,currentMsgSummary): + dismissed = dismissed + 1 + continue + + # store the new message let index = msg.computeIndex() - ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)) + let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic) + ws.messages.add(indexedWakuMsg) waku_store_messages.inc(labelValues = ["stored"]) + + added = added + 1 + + # store in db if exists if ws.store.isNil: continue let res = ws.store.put(index, msg, DefaultTopic) if res.isErr: warn "failed to store messages", err = res.error waku_store_errors.inc(labelValues = ["store_failure"]) + + debug "number of duplicate messages found in resume", dismissed=dismissed + debug "number of messages added via resume", added=added + + let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) if peerList.isSome: @@ -565,7 +596,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] debug "failed to resume the history" return err("failed to resume the history") debug "resume is done successfully" - return ok(successResult.value) + return ok(added) # NOTE: Experimental, maybe incorporate as part of query call proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index af43ae81f..f42be4b6b 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -58,7 +58,7 @@ type query*: HistoryQuery response*: HistoryResponse - QueryResult* = Result[int64, string] + QueryResult* = Result[uint64, string] WakuStore* = ref object of LPProtocol peerManager*: PeerManager