refactor(store): reorganise waku store protocol is sqqlite-only logic

This commit is contained in:
Lorenzo Delgado 2022-09-13 18:06:23 +02:00 committed by Lorenzo Delgado
parent d49cd1348d
commit 83cffd50ed
1 changed files with 96 additions and 65 deletions

View File

@ -54,7 +54,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64
# Error types (metric label values) # Error types (metric label values)
const const
storeFailure = "store_failure" insertFailure = "insert_failure"
dialFailure = "dial_failure" dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure" decodeRpcFailure = "decode_rpc_failure"
peerNotFoundFailure = "peer_not_found_failure" peerNotFoundFailure = "peer_not_found_failure"
@ -145,7 +145,7 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
let resp = ws.findMessages(req.query) let resp = ws.findMessages(req.query)
if not ws.wakuSwap.isNil: if not ws.wakuSwap.isNil():
info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text
# Perform accounting operation # Perform accounting operation
@ -163,27 +163,32 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
ws.codec = WakuStoreCodec ws.codec = WakuStoreCodec
ws.messages = StoreQueueRef.new(capacity) ws.messages = StoreQueueRef.new(capacity)
if ws.store.isNil:
return
if ws.isSqliteOnly: if ws.isSqliteOnly:
if ws.store.isNil():
warn "store not provided (nil)"
return
info "SQLite-only store initialized. Messages are *not* loaded into memory." info "SQLite-only store initialized. Messages are *not* loaded into memory."
return
# Load all messages from sqliteStore into queueStore # TODO: Move this logic, together with the insert message logic
info "attempting to load messages from persistent storage" # into a "dual-store" message store implementation.
let res = ws.store.getAllMessages()
if res.isOk():
for (receiverTime, msg, pubsubTopic) in res.value:
let index = Index.compute(msg, receiverTime, pubsubTopic)
discard ws.messages.put(index, msg, pubsubTopic)
info "successfully loaded messages from the persistent store"
else: else:
warn "failed to load messages from the persistent store", err = res.error() if ws.store.isNil():
return
info "loading messages from persistent storage"
let res = ws.store.getAllMessages()
if res.isOk():
for (receiverTime, msg, pubsubTopic) in res.value:
let index = Index.compute(msg, receiverTime, pubsubTopic)
discard ws.messages.put(index, msg, pubsubTopic)
info "successfully loaded messages from the persistent store"
else:
warn "failed to load messages from the persistent store", err = res.error()
debug "the number of messages in the memory", messageNum=ws.messages.len
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext,
@ -199,7 +204,7 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
waku_store_peers.inc() waku_store_peers.inc()
proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} =
if not w.persistMessages: if not w.persistMessages:
# Store is mounted but new messages should not be stored # Store is mounted but new messages should not be stored
return return
@ -207,33 +212,43 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
if msg.ephemeral: if msg.ephemeral:
# The message is ephemeral, should not be stored # The message is ephemeral, should not be stored
return return
let now = getNanosecondTime(getTime().toUnixFloat())
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
let index = Index.compute( trace "handling message", topic=pubsubTopic, index=index
msg,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = topic
)
# Add message to in-memory store if w.isSqliteOnly:
if not w.isSqliteOnly: # Add messages to persistent store, if present
trace "handle message in WakuStore", topic=topic, msg=msg if w.store.isNil():
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr():
debug "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())])
return return
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
# Add messages to persistent store, if present
if w.store.isNil:
return
let res = w.store.put(index, msg, topic) let resPutStore = w.store.put(index, msg, pubsubTopic)
if res.isErr(): if resPutStore.isErr():
debug "failed to store messages", err=res.error() debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [storeFailure]) waku_store_errors.inc(labelValues = [insertFailure])
waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"])
# TODO: Move this logic, together with the load from persistent store on init
# into a "dual-store" message store implementation.
else:
# Add message to in-memory store
let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
if resPutInmemory.isErr():
debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
waku_store_errors.inc(labelValues = [insertFailure])
return
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
# Add messages to persistent store, if present
if w.store.isNil():
return
let resPutStore = w.store.put(index, msg, pubsubTopic)
if resPutStore.isErr():
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [insertFailure])
# TODO: Remove after converting the query method into a non-callback method # TODO: Remove after converting the query method into a non-callback method
@ -347,6 +362,12 @@ proc resume*(w: WakuStore,
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
# If store has not been provided, don't even try
if w.isSqliteOnly and w.store.isNil():
return err("store not provided")
var lastSeenTime = Timestamp(0) var lastSeenTime = Timestamp(0)
var currentTime = getNanosecondTime(epochTime()) var currentTime = getNanosecondTime(epochTime())
@ -354,7 +375,6 @@ proc resume*(w: WakuStore,
if lastSeenItem.isOk(): if lastSeenItem.isOk():
lastSeenTime = lastSeenItem.get().msg.timestamp lastSeenTime = lastSeenItem.get().msg.timestamp
# adjust the time window with an offset of 20 seconds # adjust the time window with an offset of 20 seconds
let offset: Timestamp = getNanosecondTime(20) let offset: Timestamp = getNanosecondTime(20)
currentTime = currentTime + offset currentTime = currentTime + offset
@ -400,38 +420,49 @@ proc resume*(w: WakuStore,
var added: uint = 0 var added: uint = 0
for msg in res.get(): for msg in res.get():
let index = Index.compute( let now = getNanosecondTime(getTime().toUnixFloat())
msg, let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = pubsubTopic
)
# check for duplicate messages if w.isSqliteOnly:
# TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic # Add messages to persistent store
if w.messages.contains(index): let resPutStore = w.store.put(index, msg, pubsubTopic)
dismissed.inc() if resPutStore.isErr():
continue debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [insertFailure])
continue
# store the new message # TODO: Move this logic, together with the load from persistent store on init
let resPut = w.messages.put(index, msg, pubsubTopic) # into a "dual-store" message store implementation.
if resPut.isErr(): else:
trace "failed to store messages", err = resPut.error() # check for duplicate messages
waku_store_errors.inc(labelValues = [storeFailure]) # TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
continue if w.messages.contains(index):
dismissed.inc()
continue
# store in db if exists # Add message to in-memory store
if not w.store.isNil(): let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
let resPut = w.store.put(index, msg, pubsubTopic) if resPutInmemory.isErr():
if resPut.isErr(): debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
trace "failed to store messages", err = resPut.error() waku_store_errors.inc(labelValues = [insertFailure])
waku_store_errors.inc(labelValues = [storeFailure]) continue
if w.store.isNil():
continue
# Add messages to persistent store
let resPutStore = w.store.put(index, msg, pubsubTopic)
if resPutStore.isErr():
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
waku_store_errors.inc(labelValues = [insertFailure])
continue continue
added.inc() added.inc()
debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed
return ok(added) return ok(added)