From d49b53406ca1cb83b82f32c3d17ffd363a5f2cd0 Mon Sep 17 00:00:00 2001 From: LNSD Date: Tue, 13 Sep 2022 18:08:06 +0000 Subject: [PATCH] deploy: a4b989cf0dee7d084646e337567384bb57b209a6 --- .../v2/node/storage/message/message_store.nim | 2 + .../message/sqlite_store/sqlite_store.nim | 2 + .../node/storage/message/waku_store_queue.nim | 3 + waku/v2/protocol/waku_store/protocol.nim | 196 +++++++++++------- 4 files changed, 132 insertions(+), 71 deletions(-) diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 99c3ff434..a970c250f 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -45,3 +45,5 @@ method getMessagesByHistoryQuery*( maxPageSize = StoreMaxPageSize, ascendingOrder = true ): MessageStoreResult[MessageStorePage] {.base.} = discard + +method getMessagesCount*(ms: MessageStore): int64 {.base.} = discard diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index 53f86c661..2154bfd6a 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -156,6 +156,8 @@ method getMessagesByHistoryQuery*( ok((messages, some(pagingInfo))) +method getMessagesCount*(s: SqliteStore): int64 = + int64(s.numMessages) proc close*(s: SqliteStore) = ## Close the database connection diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 20d2460c5..266168d0e 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -425,3 +425,6 @@ method getMessagesByHistoryQuery*( return ok((messages, none(PagingInfo))) ok((messages, some(pagingInfo))) + +method getMessagesCount*(storeQueue: StoreQueueRef): int64 = + int64(storeQueue.len()) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index e4cd92de2..52b1fc708 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -31,6 +31,8 @@ declarePublicGauge waku_store_messages, "number of historical messages", ["type" declarePublicGauge waku_store_peers, "number of store peers" declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] declarePublicGauge waku_store_queries, "number of store queries received" +declarePublicHistogram waku_store_insert_time, "time spent storing a message (ms)" +declarePublicHistogram waku_store_query_time, "time spent processing a history query (ms)" logScope: topics = "wakustore" @@ -54,7 +56,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64 # Error types (metric label values) const - storeFailure = "store_failure" + insertFailure = "insert_failure" dialFailure = "dial_failure" decodeRpcFailure = "decode_rpc_failure" peerNotFoundFailure = "peer_not_found_failure" @@ -91,8 +93,12 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} else: none(Timestamp) qMaxPageSize = query.pagingInfo.pageSize qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD - + + let queryStartTime = getTime().toUnixFloat() + let queryRes = block: + # TODO: Move this logic, together with the insert message logic and load messages on boot + # into a "dual-store" message store implementation. if w.isSqliteOnly: w.store.getMessagesByHistoryQuery( contentTopic = qContentTopics, @@ -113,7 +119,11 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} maxPageSize = qMaxPageSize, ascendingOrder = qAscendingOrder ) - + + let queryTime = getTime().toUnixFloat() - queryStartTime + waku_store_query_time.observe(getMillisecondTime(queryTime)) + + # Build response # TODO: Improve error reporting if queryRes.isErr(): @@ -145,7 +155,7 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = let resp = ws.findMessages(req.query) - if not ws.wakuSwap.isNil: + if not ws.wakuSwap.isNil(): info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text # Perform accounting operation @@ -163,28 +173,36 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = ws.codec = WakuStoreCodec ws.messages = StoreQueueRef.new(capacity) - if ws.store.isNil: - return if ws.isSqliteOnly: + if ws.store.isNil(): + warn "store not provided (nil)" + return + info "SQLite-only store initialized. Messages are *not* loaded into memory." - return + waku_store_messages.set(ws.store.getMessagesCount(), labelValues = ["stored"]) + - # Load all messages from sqliteStore into queueStore - info "attempting to load messages from persistent storage" - - let res = ws.store.getAllMessages() - if res.isOk(): - for (receiverTime, msg, pubsubTopic) in res.value: - let index = Index.compute(msg, receiverTime, pubsubTopic) - discard ws.messages.put(index, msg, pubsubTopic) - - info "successfully loaded messages from the persistent store" + # TODO: Move this logic, together with the insert message logic + # into a "dual-store" message store implementation. else: - warn "failed to load messages from the persistent store", err = res.error() + if ws.store.isNil(): + return + + info "loading messages from persistent storage" + + let res = ws.store.getAllMessages() + if res.isOk(): + for (receiverTime, msg, pubsubTopic) in res.value: + let index = Index.compute(msg, receiverTime, pubsubTopic) + discard ws.messages.put(index, msg, pubsubTopic) + + info "successfully loaded messages from the persistent store" + else: + warn "failed to load messages from the persistent store", err = res.error() + + waku_store_messages.set(ws.messages.getMessagesCount(), labelValues = ["stored"]) - debug "the number of messages in the memory", messageNum=ws.messages.len - waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, @@ -199,7 +217,7 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = waku_store_peers.inc() -proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = +proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} = if not w.persistMessages: # Store is mounted but new messages should not be stored return @@ -207,33 +225,51 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = if msg.ephemeral: # The message is ephemeral, should not be stored return - - let index = Index.compute( - msg, - receivedTime = getNanosecondTime(getTime().toUnixFloat()), - pubsubTopic = topic - ) - - # Add message to in-memory store - if not w.isSqliteOnly: - trace "handle message in WakuStore", topic=topic, msg=msg - - let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) - if addRes.isErr(): - debug "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() - waku_store_errors.inc(labelValues = [$(addRes.error())]) - return - waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) - - # Add messages to persistent store, if present - if w.store.isNil: - return + let insertStartTime = getTime().toUnixFloat() + + let now = getNanosecondTime(getTime().toUnixFloat()) + let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) - let res = w.store.put(index, msg, topic) - if res.isErr(): - debug "failed to store messages", err=res.error() - waku_store_errors.inc(labelValues = [storeFailure]) + trace "handling message", topic=pubsubTopic, index=index + + block: + if w.isSqliteOnly: + # Add messages to persistent store, if present + if w.store.isNil(): + return + + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) + + waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"]) + + # TODO: Move this logic, together with the load from persistent store on init + # into a "dual-store" message store implementation. + else: + # Add message to in-memory store + let resPutInmemory = w.messages.put(index, msg, pubsubTopic) + if resPutInmemory.isErr(): + debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() + waku_store_errors.inc(labelValues = [insertFailure]) + return + + waku_store_messages.set(w.messages.getMessagesCount(), labelValues = ["stored"]) + + # Add messages to persistent store, if present + if w.store.isNil(): + return + + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) + return + + let insertTime = getTime().toUnixFloat() - insertStartTime + waku_store_insert_time.observe(getMillisecondTime(insertTime)) # TODO: Remove after converting the query method into a non-callback method @@ -347,6 +383,12 @@ proc resume*(w: WakuStore, ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string + + # If store has not been provided, don't even try + if w.isSqliteOnly and w.store.isNil(): + return err("store not provided") + + var lastSeenTime = Timestamp(0) var currentTime = getNanosecondTime(epochTime()) @@ -354,7 +396,6 @@ proc resume*(w: WakuStore, if lastSeenItem.isOk(): lastSeenTime = lastSeenItem.get().msg.timestamp - # adjust the time window with an offset of 20 seconds let offset: Timestamp = getNanosecondTime(20) currentTime = currentTime + offset @@ -400,38 +441,51 @@ proc resume*(w: WakuStore, var added: uint = 0 for msg in res.get(): - let index = Index.compute( - msg, - receivedTime = getNanosecondTime(getTime().toUnixFloat()), - pubsubTopic = pubsubTopic - ) + let now = getNanosecondTime(getTime().toUnixFloat()) + let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) - # check for duplicate messages - # TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic - if w.messages.contains(index): - dismissed.inc() - continue + if w.isSqliteOnly: + # Add messages to persistent store + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) + continue - # store the new message - let resPut = w.messages.put(index, msg, pubsubTopic) - if resPut.isErr(): - trace "failed to store messages", err = resPut.error() - waku_store_errors.inc(labelValues = [storeFailure]) - continue + # TODO: Move this logic, together with the load from persistent store on init + # into a "dual-store" message store implementation. + else: + # check for duplicate messages + # TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic + if w.messages.contains(index): + dismissed.inc() + continue - # store in db if exists - if not w.store.isNil(): - let resPut = w.store.put(index, msg, pubsubTopic) - if resPut.isErr(): - trace "failed to store messages", err = resPut.error() - waku_store_errors.inc(labelValues = [storeFailure]) + # Add message to in-memory store + let resPutInmemory = w.messages.put(index, msg, pubsubTopic) + if resPutInmemory.isErr(): + debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() + waku_store_errors.inc(labelValues = [insertFailure]) + continue + + if w.store.isNil(): + continue + + # Add messages to persistent store + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) continue added.inc() - - waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed + + let messagesCount = if w.isSqliteOnly: w.store.getMessagesCount() + else: w.messages.getMessagesCount() + waku_store_messages.set(messagesCount, labelValues = ["stored"]) + return ok(added)