From 1519a6ac79391e8e87b1bbfa7b37a484dbf3a796 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Tue, 13 Sep 2022 18:06:23 +0200 Subject: [PATCH] refactor(store): reorganise waku store protocol is sqqlite-only logic --- waku/v2/protocol/waku_store/protocol.nim | 161 ++++++++++++++--------- 1 file changed, 96 insertions(+), 65 deletions(-) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index e4cd92de2..2bc37de8e 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -54,7 +54,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64 # Error types (metric label values) const - storeFailure = "store_failure" + insertFailure = "insert_failure" dialFailure = "dial_failure" decodeRpcFailure = "decode_rpc_failure" peerNotFoundFailure = "peer_not_found_failure" @@ -145,7 +145,7 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = let resp = ws.findMessages(req.query) - if not ws.wakuSwap.isNil: + if not ws.wakuSwap.isNil(): info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text # Perform accounting operation @@ -163,27 +163,32 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = ws.codec = WakuStoreCodec ws.messages = StoreQueueRef.new(capacity) - if ws.store.isNil: - return if ws.isSqliteOnly: + if ws.store.isNil(): + warn "store not provided (nil)" + return + info "SQLite-only store initialized. Messages are *not* loaded into memory." - return - # Load all messages from sqliteStore into queueStore - info "attempting to load messages from persistent storage" - - let res = ws.store.getAllMessages() - if res.isOk(): - for (receiverTime, msg, pubsubTopic) in res.value: - let index = Index.compute(msg, receiverTime, pubsubTopic) - discard ws.messages.put(index, msg, pubsubTopic) - - info "successfully loaded messages from the persistent store" + # TODO: Move this logic, together with the insert message logic + # into a "dual-store" message store implementation. else: - warn "failed to load messages from the persistent store", err = res.error() + if ws.store.isNil(): + return + + info "loading messages from persistent storage" + + let res = ws.store.getAllMessages() + if res.isOk(): + for (receiverTime, msg, pubsubTopic) in res.value: + let index = Index.compute(msg, receiverTime, pubsubTopic) + discard ws.messages.put(index, msg, pubsubTopic) + + info "successfully loaded messages from the persistent store" + else: + warn "failed to load messages from the persistent store", err = res.error() - debug "the number of messages in the memory", messageNum=ws.messages.len waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, @@ -199,7 +204,7 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = waku_store_peers.inc() -proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = +proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} = if not w.persistMessages: # Store is mounted but new messages should not be stored return @@ -207,33 +212,43 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = if msg.ephemeral: # The message is ephemeral, should not be stored return + + let now = getNanosecondTime(getTime().toUnixFloat()) + let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) - let index = Index.compute( - msg, - receivedTime = getNanosecondTime(getTime().toUnixFloat()), - pubsubTopic = topic - ) + trace "handling message", topic=pubsubTopic, index=index - # Add message to in-memory store - if not w.isSqliteOnly: - trace "handle message in WakuStore", topic=topic, msg=msg - - let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) - if addRes.isErr(): - debug "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() - waku_store_errors.inc(labelValues = [$(addRes.error())]) + if w.isSqliteOnly: + # Add messages to persistent store, if present + if w.store.isNil(): return - - waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) - - # Add messages to persistent store, if present - if w.store.isNil: - return - let res = w.store.put(index, msg, topic) - if res.isErr(): - debug "failed to store messages", err=res.error() - waku_store_errors.inc(labelValues = [storeFailure]) + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) + waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"]) + + # TODO: Move this logic, together with the load from persistent store on init + # into a "dual-store" message store implementation. + else: + # Add message to in-memory store + let resPutInmemory = w.messages.put(index, msg, pubsubTopic) + if resPutInmemory.isErr(): + debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() + waku_store_errors.inc(labelValues = [insertFailure]) + return + waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) + + # Add messages to persistent store, if present + if w.store.isNil(): + return + + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) + # TODO: Remove after converting the query method into a non-callback method @@ -347,6 +362,12 @@ proc resume*(w: WakuStore, ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string + + # If store has not been provided, don't even try + if w.isSqliteOnly and w.store.isNil(): + return err("store not provided") + + var lastSeenTime = Timestamp(0) var currentTime = getNanosecondTime(epochTime()) @@ -354,7 +375,6 @@ proc resume*(w: WakuStore, if lastSeenItem.isOk(): lastSeenTime = lastSeenItem.get().msg.timestamp - # adjust the time window with an offset of 20 seconds let offset: Timestamp = getNanosecondTime(20) currentTime = currentTime + offset @@ -400,38 +420,49 @@ proc resume*(w: WakuStore, var added: uint = 0 for msg in res.get(): - let index = Index.compute( - msg, - receivedTime = getNanosecondTime(getTime().toUnixFloat()), - pubsubTopic = pubsubTopic - ) + let now = getNanosecondTime(getTime().toUnixFloat()) + let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) - # check for duplicate messages - # TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic - if w.messages.contains(index): - dismissed.inc() - continue + if w.isSqliteOnly: + # Add messages to persistent store + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) + continue - # store the new message - let resPut = w.messages.put(index, msg, pubsubTopic) - if resPut.isErr(): - trace "failed to store messages", err = resPut.error() - waku_store_errors.inc(labelValues = [storeFailure]) - continue + # TODO: Move this logic, together with the load from persistent store on init + # into a "dual-store" message store implementation. + else: + # check for duplicate messages + # TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic + if w.messages.contains(index): + dismissed.inc() + continue - # store in db if exists - if not w.store.isNil(): - let resPut = w.store.put(index, msg, pubsubTopic) - if resPut.isErr(): - trace "failed to store messages", err = resPut.error() - waku_store_errors.inc(labelValues = [storeFailure]) + # Add message to in-memory store + let resPutInmemory = w.messages.put(index, msg, pubsubTopic) + if resPutInmemory.isErr(): + debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error() + waku_store_errors.inc(labelValues = [insertFailure]) + continue + + if w.store.isNil(): + continue + + # Add messages to persistent store + let resPutStore = w.store.put(index, msg, pubsubTopic) + if resPutStore.isErr(): + debug "failed to insert message to persistent store", index=index, err=resPutStore.error() + waku_store_errors.inc(labelValues = [insertFailure]) continue added.inc() + + debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) - debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed return ok(added)