deploy: 30a7a7a4fe5f87677aa9162b5fcfe8b769649525

This commit is contained in:
oskarth 2021-06-22 03:52:21 +00:00
parent 785f917807
commit befab651dc
6 changed files with 103 additions and 10 deletions

View File

@ -7,6 +7,8 @@ This release contains the following:
### Changes ### Changes
- Enables db migration for the message store. - Enables db migration for the message store.
- The `resume` Nim API eliminates duplicates messages before storing them.
#### General refactoring #### General refactoring
#### Docs #### Docs
#### Schema #### Schema
@ -17,7 +19,6 @@ This release contains the following:
### Fixes ### Fixes
## 2021-06-03 v0.4 ## 2021-06-03 v0.4
This release contains the following: This release contains the following:

View File

@ -12,6 +12,8 @@ import
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
eth/keys, eth/keys,
../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], ../../waku/v2/protocol/[waku_relay, waku_message, message_notifier],
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
@ -699,8 +701,6 @@ procSuite "WakuNode":
contentTopic = ContentTopic("/waku/2/default-content/proto") contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
var completionFut = newFuture[bool]()
await node1.start() await node1.start()
node1.mountStore(persistMessages = true) node1.mountStore(persistMessages = true)
await node2.start() await node2.start()
@ -720,3 +720,64 @@ procSuite "WakuNode":
await node1.stop() await node1.stop()
await node2.stop() await node2.stop()
asyncTest "Resume proc discards duplicate messages":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
Port(60002))
contentTopic = ContentTopic("/waku/2/default-content/proto")
msg1 = WakuMessage(payload: "hello world1".toBytes(), contentTopic: contentTopic)
msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic)
# setup sqlite database for node1
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
var completionFut = newFuture[bool]()
await node1.start()
node1.mountStore(persistMessages = true, store = store)
await node2.start()
node2.mountStore(persistMessages = true)
await node2.subscriptions.notify(DefaultTopic, msg1)
await node2.subscriptions.notify(DefaultTopic, msg2)
await sleepAsync(2000.millis)
node1.wakuStore.setPeer(node2.peerInfo)
# populate db with msg1 to be a duplicate
let index1 = computeIndex(msg1)
let output1 = store.put(index1, msg1, DefaultTopic)
check output1.isOk
node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic))
# now run the resume proc
await node1.resume()
# count the total number of retrieved messages from the database
var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
responseCount += 1
# retrieve all the messages in the db
let res = store.getAll(data)
check:
res.isErr == false
check:
# if the duplicates are discarded properly, then the total number of messages after resume should be 2
# check no duplicates is in the messages field
node1.wakuStore.messages.len == 2
# check no duplicates is in the db
responseCount == 2
await node1.stop()
await node2.stop()

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services. # libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused # Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az272-329: # Libtool was configured on host fv-az173-291:
# NOTE: Changes made to this file will be lost: look at ltmain.sh. # NOTE: Changes made to this file will be lost: look at ltmain.sh.
# #
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -386,6 +386,7 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
warn "failed to load messages from store", err = res.error warn "failed to load messages from store", err = res.error
waku_store_errors.inc(labelValues = ["store_load_failure"]) waku_store_errors.inc(labelValues = ["store_load_failure"])
debug "the number of messages in the memory", messageNum=ws.messages.len
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
@ -486,7 +487,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response) handler(response.value.response)
return ok(response.value.response.messages.len.int64) return ok(response.value.response.messages.len.uint64)
@ -495,7 +496,7 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, can
## returns the number of retrieved messages, or error if all the requests fail ## returns the number of retrieved messages, or error if all the requests fail
for peer in candidateList.items: for peer in candidateList.items:
let successResult = await w.queryFrom(query, handler, peer) let successResult = await w.queryFrom(query, handler, peer)
if successResult.isOk: return ok(successResult.value) if successResult.isOk: return ok(successResult.value.uint64)
debug "failed to resolve the query" debug "failed to resolve the query"
return err("failed to resolve the query") return err("failed to resolve the query")
@ -507,6 +508,14 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
lastSeenTime = iwmsg.msg.timestamp lastSeenTime = iwmsg.msg.timestamp
return lastSeenTime return lastSeenTime
proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
## return true if a duplicate message is found, otherwise false
# it is defined as a separate proc to be bale to adjust comparison criteria
# e.g., to exclude timestamp or include pubsub topic
if message in list: return true
return false
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} = proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db ## messages are stored in the store node's messages field and in the message db
@ -528,17 +537,39 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
lastSeenTime = max(lastSeenTime - offset, 0) lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
var dismissed: uint = 0
var added: uint = 0
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} = proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
debug "resume handler is called"
# exclude index from the comparison criteria
let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg)
for msg in response.messages: for msg in response.messages:
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if isDuplicate(msg,currentMsgSummary):
dismissed = dismissed + 1
continue
# store the new message
let index = msg.computeIndex() let index = msg.computeIndex()
ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)) let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)
ws.messages.add(indexedWakuMsg)
waku_store_messages.inc(labelValues = ["stored"]) waku_store_messages.inc(labelValues = ["stored"])
added = added + 1
# store in db if exists
if ws.store.isNil: continue if ws.store.isNil: continue
let res = ws.store.put(index, msg, DefaultTopic) let res = ws.store.put(index, msg, DefaultTopic)
if res.isErr: if res.isErr:
warn "failed to store messages", err = res.error warn "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"]) waku_store_errors.inc(labelValues = ["store_failure"])
debug "number of duplicate messages found in resume", dismissed=dismissed
debug "number of messages added via resume", added=added
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
if peerList.isSome: if peerList.isSome:
@ -565,7 +596,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
debug "failed to resume the history" debug "failed to resume the history"
return err("failed to resume the history") return err("failed to resume the history")
debug "resume is done successfully" debug "resume is done successfully"
return ok(successResult.value) return ok(added)
# NOTE: Experimental, maybe incorporate as part of query call # NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =

View File

@ -58,7 +58,7 @@ type
query*: HistoryQuery query*: HistoryQuery
response*: HistoryResponse response*: HistoryResponse
QueryResult* = Result[int64, string] QueryResult* = Result[uint64, string]
WakuStore* = ref object of LPProtocol WakuStore* = ref object of LPProtocol
peerManager*: PeerManager peerManager*: PeerManager