From a4860d92a0906e51eb941f84ce840ea03a9feaf5 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Tue, 13 Sep 2022 16:48:33 +0200 Subject: [PATCH] feat(store): add query and insertion time metrics --- waku/v2/protocol/waku_store/protocol.nim | 76 +++++++++++++++--------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index f7a704555..52b1fc708 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -31,6 +31,8 @@ declarePublicGauge waku_store_messages, "number of historical messages", ["type" declarePublicGauge waku_store_peers, "number of store peers" declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] declarePublicGauge waku_store_queries, "number of store queries received" +declarePublicHistogram waku_store_insert_time, "time spent storing a message (ms)" +declarePublicHistogram waku_store_query_time, "time spent processing a history query (ms)" logScope: topics = "wakustore" @@ -91,8 +93,12 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} else: none(Timestamp) qMaxPageSize = query.pagingInfo.pageSize qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD - + + let queryStartTime = getTime().toUnixFloat() + let queryRes = block: + # TODO: Move this logic, together with the insert message logic and load messages on boot + # into a "dual-store" message store implementation. if w.isSqliteOnly: w.store.getMessagesByHistoryQuery( contentTopic = qContentTopics, @@ -113,7 +119,11 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} maxPageSize = qMaxPageSize, ascendingOrder = qAscendingOrder ) - + + let queryTime = getTime().toUnixFloat() - queryStartTime + waku_store_query_time.observe(getMillisecondTime(queryTime)) + + # Build response # TODO: Improve error reporting if queryRes.isErr(): @@ -215,43 +225,51 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async if msg.ephemeral: # The message is ephemeral, should not be stored return + + let insertStartTime = getTime().toUnixFloat() let now = getNanosecondTime(getTime().toUnixFloat()) let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) trace "handling message", topic=pubsubTopic, index=index - if w.isSqliteOnly: - # Add messages to persistent store, if present - if w.store.isNil(): - return + block: + if w.isSqliteOnly: + # Add messages to persistent store, if present + if w.store.isNil(): + return - let resPutStore = w.store.put(index, msg, pubsubTopic) - if resPutStore.isErr(): - debug "failed to insert message to persistent store", index=index, err=resPutStore.error() - waku_store_errors.inc(labelValues = [insertFailure]) - waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"]) + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) - # TODO: Move this logic, together with the load from persistent store on init - # into a "dual-store" message store implementation. - else: - # Add message to in-memory store - let resPutInmemory = w.messages.put(index, msg, pubsubTopic) - if resPutInmemory.isErr(): - debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() - waku_store_errors.inc(labelValues = [insertFailure]) - return - waku_store_messages.set(w.messages.getMessagesCount(), labelValues = ["stored"]) + waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"]) - # Add messages to persistent store, if present - if w.store.isNil(): - return - - let resPutStore = w.store.put(index, msg, pubsubTopic) - if resPutStore.isErr(): - debug "failed to insert message to persistent store", index=index, err=resPutStore.error() - waku_store_errors.inc(labelValues = [insertFailure]) + # TODO: Move this logic, together with the load from persistent store on init + # into a "dual-store" message store implementation. + else: + # Add message to in-memory store + let resPutInmemory = w.messages.put(index, msg, pubsubTopic) + if resPutInmemory.isErr(): + debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() + waku_store_errors.inc(labelValues = [insertFailure]) + return + waku_store_messages.set(w.messages.getMessagesCount(), labelValues = ["stored"]) + + # Add messages to persistent store, if present + if w.store.isNil(): + return + + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) + return + + let insertTime = getTime().toUnixFloat() - insertStartTime + waku_store_insert_time.observe(getMillisecondTime(insertTime)) # TODO: Remove after converting the query method into a non-callback method