mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 17:03:09 +00:00
deploy: a4b989cf0dee7d084646e337567384bb57b209a6
This commit is contained in:
parent
f47bd55ee6
commit
d49b53406c
@ -45,3 +45,5 @@ method getMessagesByHistoryQuery*(
|
||||
maxPageSize = StoreMaxPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] {.base.} = discard
|
||||
|
||||
method getMessagesCount*(ms: MessageStore): int64 {.base.} = discard
|
||||
|
||||
@ -156,6 +156,8 @@ method getMessagesByHistoryQuery*(
|
||||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
||||
method getMessagesCount*(s: SqliteStore): int64 =
|
||||
int64(s.numMessages)
|
||||
|
||||
proc close*(s: SqliteStore) =
|
||||
## Close the database connection
|
||||
|
||||
@ -425,3 +425,6 @@ method getMessagesByHistoryQuery*(
|
||||
return ok((messages, none(PagingInfo)))
|
||||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
||||
method getMessagesCount*(storeQueue: StoreQueueRef): int64 =
|
||||
int64(storeQueue.len())
|
||||
|
||||
@ -31,6 +31,8 @@ declarePublicGauge waku_store_messages, "number of historical messages", ["type"
|
||||
declarePublicGauge waku_store_peers, "number of store peers"
|
||||
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
|
||||
declarePublicGauge waku_store_queries, "number of store queries received"
|
||||
declarePublicHistogram waku_store_insert_time, "time spent storing a message (ms)"
|
||||
declarePublicHistogram waku_store_query_time, "time spent processing a history query (ms)"
|
||||
|
||||
logScope:
|
||||
topics = "wakustore"
|
||||
@ -54,7 +56,7 @@ const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
storeFailure = "store_failure"
|
||||
insertFailure = "insert_failure"
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
peerNotFoundFailure = "peer_not_found_failure"
|
||||
@ -91,8 +93,12 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
|
||||
else: none(Timestamp)
|
||||
qMaxPageSize = query.pagingInfo.pageSize
|
||||
qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD
|
||||
|
||||
|
||||
let queryStartTime = getTime().toUnixFloat()
|
||||
|
||||
let queryRes = block:
|
||||
# TODO: Move this logic, together with the insert message logic and load messages on boot
|
||||
# into a "dual-store" message store implementation.
|
||||
if w.isSqliteOnly:
|
||||
w.store.getMessagesByHistoryQuery(
|
||||
contentTopic = qContentTopics,
|
||||
@ -113,7 +119,11 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
|
||||
maxPageSize = qMaxPageSize,
|
||||
ascendingOrder = qAscendingOrder
|
||||
)
|
||||
|
||||
|
||||
let queryTime = getTime().toUnixFloat() - queryStartTime
|
||||
waku_store_query_time.observe(getMillisecondTime(queryTime))
|
||||
|
||||
|
||||
# Build response
|
||||
# TODO: Improve error reporting
|
||||
if queryRes.isErr():
|
||||
@ -145,7 +155,7 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
|
||||
|
||||
let resp = ws.findMessages(req.query)
|
||||
|
||||
if not ws.wakuSwap.isNil:
|
||||
if not ws.wakuSwap.isNil():
|
||||
info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text
|
||||
|
||||
# Perform accounting operation
|
||||
@ -163,28 +173,36 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
|
||||
ws.codec = WakuStoreCodec
|
||||
ws.messages = StoreQueueRef.new(capacity)
|
||||
|
||||
if ws.store.isNil:
|
||||
return
|
||||
|
||||
if ws.isSqliteOnly:
|
||||
if ws.store.isNil():
|
||||
warn "store not provided (nil)"
|
||||
return
|
||||
|
||||
info "SQLite-only store initialized. Messages are *not* loaded into memory."
|
||||
return
|
||||
waku_store_messages.set(ws.store.getMessagesCount(), labelValues = ["stored"])
|
||||
|
||||
|
||||
# Load all messages from sqliteStore into queueStore
|
||||
info "attempting to load messages from persistent storage"
|
||||
|
||||
let res = ws.store.getAllMessages()
|
||||
if res.isOk():
|
||||
for (receiverTime, msg, pubsubTopic) in res.value:
|
||||
let index = Index.compute(msg, receiverTime, pubsubTopic)
|
||||
discard ws.messages.put(index, msg, pubsubTopic)
|
||||
|
||||
info "successfully loaded messages from the persistent store"
|
||||
# TODO: Move this logic, together with the insert message logic
|
||||
# into a "dual-store" message store implementation.
|
||||
else:
|
||||
warn "failed to load messages from the persistent store", err = res.error()
|
||||
if ws.store.isNil():
|
||||
return
|
||||
|
||||
info "loading messages from persistent storage"
|
||||
|
||||
let res = ws.store.getAllMessages()
|
||||
if res.isOk():
|
||||
for (receiverTime, msg, pubsubTopic) in res.value:
|
||||
let index = Index.compute(msg, receiverTime, pubsubTopic)
|
||||
discard ws.messages.put(index, msg, pubsubTopic)
|
||||
|
||||
info "successfully loaded messages from the persistent store"
|
||||
else:
|
||||
warn "failed to load messages from the persistent store", err = res.error()
|
||||
|
||||
waku_store_messages.set(ws.messages.getMessagesCount(), labelValues = ["stored"])
|
||||
|
||||
debug "the number of messages in the memory", messageNum=ws.messages.len
|
||||
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
|
||||
|
||||
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext,
|
||||
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
|
||||
@ -199,7 +217,7 @@ proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
|
||||
waku_store_peers.inc()
|
||||
|
||||
|
||||
proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
||||
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async.} =
|
||||
if not w.persistMessages:
|
||||
# Store is mounted but new messages should not be stored
|
||||
return
|
||||
@ -207,33 +225,51 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
||||
if msg.ephemeral:
|
||||
# The message is ephemeral, should not be stored
|
||||
return
|
||||
|
||||
let index = Index.compute(
|
||||
msg,
|
||||
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
|
||||
pubsubTopic = topic
|
||||
)
|
||||
|
||||
# Add message to in-memory store
|
||||
if not w.isSqliteOnly:
|
||||
trace "handle message in WakuStore", topic=topic, msg=msg
|
||||
|
||||
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
||||
if addRes.isErr():
|
||||
debug "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
|
||||
waku_store_errors.inc(labelValues = [$(addRes.error())])
|
||||
return
|
||||
|
||||
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
||||
|
||||
# Add messages to persistent store, if present
|
||||
if w.store.isNil:
|
||||
return
|
||||
let insertStartTime = getTime().toUnixFloat()
|
||||
|
||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
||||
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
|
||||
|
||||
let res = w.store.put(index, msg, topic)
|
||||
if res.isErr():
|
||||
debug "failed to store messages", err=res.error()
|
||||
waku_store_errors.inc(labelValues = [storeFailure])
|
||||
trace "handling message", topic=pubsubTopic, index=index
|
||||
|
||||
block:
|
||||
if w.isSqliteOnly:
|
||||
# Add messages to persistent store, if present
|
||||
if w.store.isNil():
|
||||
return
|
||||
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
|
||||
waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"])
|
||||
|
||||
# TODO: Move this logic, together with the load from persistent store on init
|
||||
# into a "dual-store" message store implementation.
|
||||
else:
|
||||
# Add message to in-memory store
|
||||
let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
|
||||
if resPutInmemory.isErr():
|
||||
debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
return
|
||||
|
||||
waku_store_messages.set(w.messages.getMessagesCount(), labelValues = ["stored"])
|
||||
|
||||
# Add messages to persistent store, if present
|
||||
if w.store.isNil():
|
||||
return
|
||||
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
return
|
||||
|
||||
let insertTime = getTime().toUnixFloat() - insertStartTime
|
||||
waku_store_insert_time.observe(getMillisecondTime(insertTime))
|
||||
|
||||
|
||||
# TODO: Remove after converting the query method into a non-callback method
|
||||
@ -347,6 +383,12 @@ proc resume*(w: WakuStore,
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||
|
||||
# If store has not been provided, don't even try
|
||||
if w.isSqliteOnly and w.store.isNil():
|
||||
return err("store not provided")
|
||||
|
||||
|
||||
var lastSeenTime = Timestamp(0)
|
||||
var currentTime = getNanosecondTime(epochTime())
|
||||
|
||||
@ -354,7 +396,6 @@ proc resume*(w: WakuStore,
|
||||
if lastSeenItem.isOk():
|
||||
lastSeenTime = lastSeenItem.get().msg.timestamp
|
||||
|
||||
|
||||
# adjust the time window with an offset of 20 seconds
|
||||
let offset: Timestamp = getNanosecondTime(20)
|
||||
currentTime = currentTime + offset
|
||||
@ -400,38 +441,51 @@ proc resume*(w: WakuStore,
|
||||
var added: uint = 0
|
||||
|
||||
for msg in res.get():
|
||||
let index = Index.compute(
|
||||
msg,
|
||||
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
|
||||
pubsubTopic = pubsubTopic
|
||||
)
|
||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
||||
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
|
||||
|
||||
# check for duplicate messages
|
||||
# TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
|
||||
if w.messages.contains(index):
|
||||
dismissed.inc()
|
||||
continue
|
||||
if w.isSqliteOnly:
|
||||
# Add messages to persistent store
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
continue
|
||||
|
||||
# store the new message
|
||||
let resPut = w.messages.put(index, msg, pubsubTopic)
|
||||
if resPut.isErr():
|
||||
trace "failed to store messages", err = resPut.error()
|
||||
waku_store_errors.inc(labelValues = [storeFailure])
|
||||
continue
|
||||
# TODO: Move this logic, together with the load from persistent store on init
|
||||
# into a "dual-store" message store implementation.
|
||||
else:
|
||||
# check for duplicate messages
|
||||
# TODO: Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
|
||||
if w.messages.contains(index):
|
||||
dismissed.inc()
|
||||
continue
|
||||
|
||||
# store in db if exists
|
||||
if not w.store.isNil():
|
||||
let resPut = w.store.put(index, msg, pubsubTopic)
|
||||
if resPut.isErr():
|
||||
trace "failed to store messages", err = resPut.error()
|
||||
waku_store_errors.inc(labelValues = [storeFailure])
|
||||
# Add message to in-memory store
|
||||
let resPutInmemory = w.messages.put(index, msg, pubsubTopic)
|
||||
if resPutInmemory.isErr():
|
||||
debug "failed to insert message to in-memory store", index=index, err=resPutInmemory.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
continue
|
||||
|
||||
if w.store.isNil():
|
||||
continue
|
||||
|
||||
# Add messages to persistent store
|
||||
let resPutStore = w.store.put(index, msg, pubsubTopic)
|
||||
if resPutStore.isErr():
|
||||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
continue
|
||||
|
||||
added.inc()
|
||||
|
||||
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
||||
|
||||
debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed
|
||||
|
||||
let messagesCount = if w.isSqliteOnly: w.store.getMessagesCount()
|
||||
else: w.messages.getMessagesCount()
|
||||
waku_store_messages.set(messagesCount, labelValues = ["stored"])
|
||||
|
||||
return ok(added)
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user