diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index b7f0e44c3..6fabcfd21 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -14,6 +14,7 @@ import libp2p/protocols/pubsub/rpc/message import ../../waku/v1/node/rpc/hexstrings, + ../../waku/v2/node/storage/message/message_store, ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/wakunode2, ../../waku/v2/node/jsonrpc/[store_api, @@ -258,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API": WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)] for wakuMsg in msgList: - node.wakuStore.handleMessage(defaultTopic, wakuMsg) + require node.wakuStore.store.put(defaultTopic, wakuMsg).isOk() let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) diff --git a/tests/v2/test_message_store_queue.nim b/tests/v2/test_message_store_queue.nim index dd2dbd75e..89832f4c0 100644 --- a/tests/v2/test_message_store_queue.nim +++ b/tests/v2/test_message_store_queue.nim @@ -88,34 +88,6 @@ procSuite "Sorted store queue": check: store.len == capacity - test "Sender time can't be more than MaxTimeVariance in future": - ## Given - let capacity = 5 - let store = StoreQueueRef.new(capacity) - let - receiverTime = getNanoSecondTime(10) - senderTimeOk = receiverTime + StoreMaxTimeVariance - senderTimeErr = senderTimeOk + 1 - - let invalidMessage = IndexedWakuMessage( - msg: WakuMessage( - payload: @[byte 1], - timestamp: senderTimeErr - ), - index: Index( - receiverTime: receiverTime, - senderTime: senderTimeErr - ) - ) - - ## When - let addRes = store.add(invalidMessage) - - ## Then - check: - addRes.isErr() - addRes.error() == "future_sender_timestamp" - test "Store queue sort-on-insert works": ## Given let diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim index f3f3804ac..4fc5039c8 100644 --- a/tests/v2/test_message_store_sqlite.nim +++ b/tests/v2/test_message_store_sqlite.nim @@ -25,13 +25,16 @@ const proc newTestDatabase(): SqliteDatabase = SqliteDatabase.init("", inMemory = true).tryGet() +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + proc getTestTimestamp(offset=0): Timestamp = - Timestamp(getNanosecondTime(epochTime())) + Timestamp(getNanosecondTime(getTime().toUnixFloat() + offset.float)) proc fakeWakuMessage( payload = "TEST-PAYLOAD", contentTopic = DefaultContentTopic, - ts = getNanosecondTime(epochTime()) + ts = now() ): WakuMessage = WakuMessage( payload: toBytes(payload), @@ -80,10 +83,9 @@ suite "SQLite message store - insert messages": store = SqliteStore.init(database).tryGet() let message = fakeWakuMessage(contentTopic=contentTopic) - let messageIndex = Index.compute(message, getNanosecondTime(epochTime()), DefaultPubsubTopic) ## When - let resPut = store.put(messageIndex, message, DefaultPubsubTopic) + let resPut = store.put(DefaultPubsubTopic, message) ## Then check: @@ -123,8 +125,7 @@ suite "SQLite message store - insert messages": ## When for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() require retentionPolicy.execute(store).isOk() ## Then @@ -147,25 +148,24 @@ suite "Message Store": let database = newTestDatabase() store = SqliteStore.init(database).get() - topic = DefaultContentTopic - pubsubTopic = DefaultPubsubTopic + let t1 = getTestTimestamp(0) t2 = getTestTimestamp(1) t3 = high(int64) var msgs = @[ - WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1), - WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2), + WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic, version: uint32(0), timestamp: t1), + WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: DefaultContentTopic, version: uint32(1), timestamp: t2), # high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage - WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3), + WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3), ] var indexes: seq[Index] = @[] for msg in msgs: - var index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - let resPut = store.put(index, msg, pubsubTopic) - require resPut.isOk + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + + let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) indexes.add(index) ## When @@ -186,7 +186,10 @@ suite "Message Store": # flags for receiver timestamp var rt1Flag, rt2Flag, rt3Flag: bool = false - for (receiverTimestamp, msg, psTopic) in result: + for (receiverTimestamp, msg, pubsubTopic) in result: + check: + pubsubTopic == DefaultPubsubTopic + # check correct retrieval of receiver timestamps if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true @@ -204,9 +207,6 @@ suite "Message Store": if msg.timestamp == t1: t1Flag = true if msg.timestamp == t2: t2Flag = true if msg.timestamp == t3: t3Flag = true - - check: - psTopic == pubSubTopic check: # check version @@ -278,10 +278,8 @@ suite "Message Store": for i in 1..capacity: - let - msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) - index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic) - require store.put(index, msg, pubsubTopic).isOk() + let msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) + require store.put(pubsubTopic, msg).isOk() require retentionPolicy.execute(store).isOk() ## Then @@ -312,19 +310,12 @@ suite "Message Store": retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) for i in 1..capacity+overload: - let - msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) - index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic) - require store.put(index, msg, pubsubTopic).isOk() + let msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) + require store.put(pubsubTopic, msg).isOk() require retentionPolicy.execute(store).isOk() # count messages in DB - var numMessages: int64 - proc handler(s: ptr sqlite3_stmt) = - numMessages = sqlite3_column_int64(s, 0) - let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in sqlite_store - discard database.query(countQuery, handler) - + let numMessages = store.getMessagesCount().tryGet() check: # expected number of messages is 120 because # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim index 40b2c8b17..80822cb77 100644 --- a/tests/v2/test_message_store_sqlite_query.nim +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -59,8 +59,7 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -108,8 +107,7 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -159,8 +157,7 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -203,8 +200,7 @@ suite "message store - history query": fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3), ] for msg in messages1: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() let messages2 = @[ @@ -214,8 +210,7 @@ suite "message store - history query": fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), ] for msg in messages2: - let index = Index.compute(msg, msg.timestamp, pubsubTopic) - require store.put(index, msg, pubsubTopic).isOk() + require store.put(pubsubTopic, msg).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -264,8 +259,7 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic) @@ -316,8 +310,7 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic) @@ -363,8 +356,7 @@ suite "message store - history query": fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4), ] for msg in messages1: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() let messages2 = @[ fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5), @@ -372,8 +364,7 @@ suite "message store - history query": fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7), ] for msg in messages2: - let index = Index.compute(msg, msg.timestamp, pubsubTopic) - require store.put(index, msg, pubsubTopic).isOk() + require store.put(pubsubTopic, msg).isOk() let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic) @@ -420,8 +411,7 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + require store.put(DefaultPubsubTopic, msg).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -462,8 +452,8 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + let digest = computeDigest(msg) + require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -474,6 +464,7 @@ suite "message store - history query": ascendingOrder=true ) + ## Then check: res.isOk() @@ -508,8 +499,8 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + let digest = computeDigest(msg) + require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -550,8 +541,8 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + let digest = computeDigest(msg) + require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() ## When let res = store.getMessagesByHistoryQuery( @@ -596,8 +587,8 @@ suite "message store - history query": ] for msg in messages: - let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic) - require store.put(index, msg, DefaultPubsubTopic).isOk() + let digest = computeDigest(msg) + require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk() let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 01d3494d5..d3813f6de 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -26,6 +26,9 @@ const DefaultContentTopic = ContentTopic("/waku/2/default-content/proto") +proc now(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + proc newTestDatabase(): SqliteDatabase = SqliteDatabase.init("", inMemory = true).tryGet() @@ -48,21 +51,11 @@ proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch = let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get()) return newStandardSwitch(some(peerKey), addrs=peerAddr) +proc newTestStore(): MessageStore = + let database = newTestDatabase() + SqliteStore.init(database).tryGet() -proc newTestWakuStore(switch: Switch): WakuStore = - let - peerManager = PeerManager.new(switch) - rng = crypto.newRng() - database = newTestDatabase() - store = SqliteStore.init(database).tryGet() - proto = WakuStore.init(peerManager, rng, store) - - waitFor proto.start() - switch.mount(proto) - - return proto - -proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore = +proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore = let peerManager = PeerManager.new(switch) rng = crypto.newRng() @@ -73,8 +66,7 @@ proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore = return proto - -suite "Waku Store": +suite "Waku Store - history query": asyncTest "handle query": ## Setup @@ -335,7 +327,7 @@ suite "Waku Store": ] for msg in msgList: - serverProto.handleMessage("foo", msg) + require serverProto.store.put(DefaultPubsubTopic, msg).isOk() ## When let rpc = HistoryQuery( @@ -387,7 +379,7 @@ suite "Waku Store": ] for msg in msgList: - serverProto.handleMessage("foo", msg) + require serverProto.store.put(DefaultPubsubTopic, msg).isOk() ## When let rpc = HistoryQuery( @@ -439,7 +431,7 @@ suite "Waku Store": ] for msg in msgList: - serverProto.handleMessage("foo", msg) + require serverProto.store.put(DefaultPubsubTopic, msg).isOk() ## When let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) @@ -461,11 +453,35 @@ suite "Waku Store": ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) - asyncTest "handle ephemeral messages": +suite "Waku Store - message handling": + + asyncTest "it should store a valid and non-ephemeral message": + ## Setup + let store = StoreQueueRef.new(5) + let switch = newTestSwitch() + let proto = newTestWakuStore(switch, store) + + ## Given + let validSenderTime = now() + let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime) + + ## When + proto.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + store.getMessagesCount().tryGet() == 1 + + ## Cleanup + await switch.stop() + + asyncTest "it should not store an ephemeral message": ## Setup let store = StoreQueueRef.new(10) let switch = newTestSwitch() let proto = newTestWakuStore(switch, store) + + ## Given let msgList = @[ fakeWakuMessage(ephemeral = false, payload = "1"), fakeWakuMessage(ephemeral = true, payload = "2"), @@ -474,15 +490,83 @@ suite "Waku Store": fakeWakuMessage(ephemeral = false, payload = "5"), ] + ## When for msg in msgList: proto.handleMessage(DefaultPubsubTopic, msg) + ## Then check: store.len == 2 ## Cleanup await switch.stop() + asyncTest "it should store a message with no sender timestamp": + ## Setup + let store = StoreQueueRef.new(5) + let switch = newTestSwitch() + let proto = newTestWakuStore(switch, store) + + ## Given + let invalidSenderTime = 0 + let message = fakeWakuMessage(ts=invalidSenderTime) + + ## When + proto.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + store.getMessagesCount().tryGet() == 1 + + ## Cleanup + await switch.stop() + + asyncTest "it should not store a message with a sender time variance greater than max time variance (future)": + ## Setup + let store = StoreQueueRef.new(5) + let switch = newTestSwitch() + let proto = newTestWakuStore(switch, store) + + ## Given + let + now = getNanoSecondTime(getTime().toUnixFloat()) + invalidSenderTime = now + MaxMessageTimestampVariance + 1 + + let message = fakeWakuMessage(ts=invalidSenderTime) + + ## When + proto.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + store.getMessagesCount().tryGet() == 0 + + ## Cleanup + await switch.stop() + + asyncTest "it should not store a message with a sender time variance greater than max time variance (past)": + ## Setup + let store = StoreQueueRef.new(5) + let switch = newTestSwitch() + let proto = newTestWakuStore(switch, store) + + ## Given + let + now = getNanoSecondTime(getTime().toUnixFloat()) + invalidSenderTime = now - MaxMessageTimestampVariance - 1 + + let message = fakeWakuMessage(ts=invalidSenderTime) + + ## When + proto.handleMessage(DefaultPubSubTopic, message) + + ## Then + check: + store.getMessagesCount().tryGet() == 0 + + ## Cleanup + await switch.stop() + # TODO: Review this test suite test cases procSuite "Waku Store - fault tolerant store": @@ -529,7 +613,7 @@ procSuite "Waku Store - fault tolerant store": ] for msg in msgList: - proto.handleMessage(DefaultPubsubTopic, msg) + require proto.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore() let msgList2 = @[ @@ -544,7 +628,7 @@ procSuite "Waku Store - fault tolerant store": ] for msg in msgList2: - proto2.handleMessage(DefaultPubsubTopic, msg) + require proto2.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() asyncTest "handle temporal history query with a valid time window": diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index fd529de5a..7c46de61e 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -73,7 +73,7 @@ procSuite "WakuNode - Store": ## Given let message = fakeWakuMessage() - server.wakuStore.handleMessage(DefaultPubsubTopic, message) + require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() ## When let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]) @@ -162,7 +162,7 @@ procSuite "WakuNode - Store": ## Given let message = fakeWakuMessage() - server.wakuStore.handleMessage(DefaultPubsubTopic, message) + require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk() ## When await client.resume() @@ -196,13 +196,15 @@ procSuite "WakuNode - Store": msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2))) msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3))) - server.wakuStore.handleMessage(DefaultTopic, msg1) - server.wakuStore.handleMessage(DefaultTopic, msg2) + require server.wakuStore.store.put(DefaultTopic, msg1).isOk() + require server.wakuStore.store.put(DefaultTopic, msg2).isOk() # Insert the same message in both node's store - let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic) - require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() - require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk() + let + receivedTime3 = getNanosecondTime(getTime().toUnixFloat() + 10.float) + digest3 = computeDigest(msg3) + require server.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk() + require client.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk() ## When await client.resume() diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 2bbf3d43a..52ba5435f 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az241-291: +# Libtool was configured on host fv-az421-295: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/storage/message/dual_message_store.nim b/waku/v2/node/storage/message/dual_message_store.nim index 204262807..34d1a2e10 100644 --- a/waku/v2/node/storage/message/dual_message_store.nim +++ b/waku/v2/node/storage/message/dual_message_store.nim @@ -1,7 +1,7 @@ {.push raises: [Defect].} import - std/options, + std/[options, times], stew/results, chronicles import @@ -34,8 +34,8 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message warn "failed to load messages from the persistent store", err = res.error else: for (receiverTime, msg, pubsubTopic) in res.value: - let index = Index.compute(msg, receiverTime, pubsubTopic) - discard inmemory.put(index, msg, pubsubTopic) + let digest = computeDigest(msg) + discard inmemory.put(pubsubTopic, msg, digest, receiverTime) info "successfully loaded messages from the persistent store" @@ -43,11 +43,14 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message return ok(DualMessageStore(inmemory: inmemory, persistent: persistent)) -method put*(s: DualMessageStore, index: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = - ?s.inmemory.put(index, message, pubsubTopic) - ?s.persistent.put(index, message, pubsubTopic) +method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] = + ?s.inmemory.put(pubsubTopic, message, digest, receivedTime) + ?s.persistent.put(pubsubTopic, message, digest, receivedTime) ok() +method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = + procCall MessageStore(s).put(pubsubTopic, message) + method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] = s.inmemory.getAllMessages() diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index ceb1290d3..afd5ba7b7 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -4,19 +4,14 @@ {.push raises: [Defect].} import - std/options, - stew/results, - chronos + std/[options, times], + stew/results import ../../../protocol/waku_message, ../../../utils/time, ../../../utils/pagination -# TODO: Remove this constant after moving time variance checks to waku store protocol -const StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future - - type MessageStoreResult*[T] = Result[T, string] @@ -28,7 +23,15 @@ type # MessageStore interface -method put*(ms: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard +method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard + +method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = + let + digest = computeDigest(message) + receivedTime = getNanosecondTime(getTime().toUnixFloat()) + + ms.put(pubsubTopic, message, digest, receivedTime) + method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index d2355c845..4376946e1 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -3,10 +3,9 @@ {.push raises: [Defect].} import - std/[options, tables, sequtils, algorithm], + std/[options, tables, sequtils, algorithm, times], stew/[byteutils, results], - chronicles, - chronos + chronicles import ../../../../protocol/waku_message, ../../../../utils/pagination, @@ -62,16 +61,12 @@ proc close*(s: SqliteStore) = s.db.close() -method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = +method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] = ## Inserts a message into the store - # Ensure that messages don't "jump" to the front with future timestamps - if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance: - return err("future_sender_timestamp") - let res = s.insertStmt.exec(( - @(cursor.digest.data), # id - cursor.receiverTime, # receiverTimestamp + @(digest.data), # id + receivedTime, # receiverTimestamp toBytes(message.contentTopic), # contentTopic message.payload, # payload toBytes(pubsubTopic), # pubsubTopic @@ -83,6 +78,10 @@ method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: st ok() +method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = + ## Inserts a message into the store + procCall MessageStore(s).put(pubsubTopic, message) + method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] = ## Retrieve all messages from the store. diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 7359df369..824379fa6 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -1,7 +1,7 @@ {.push raises: [Defect].} import - std/[options, algorithm], + std/[options, algorithm, times], stew/[results, sorted_set], chronicles import @@ -313,17 +313,10 @@ proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[voi ## Add a message to the queue ## ## If we're at capacity, we will be removing, the oldest (first) item - trace "adding item to store queue", msg=msg - - # Ensure that messages don't "jump" to the front of the queue with future timestamps - if msg.index.senderTime - msg.index.receiverTime > StoreMaxTimeVariance: - return err("future_sender_timestamp") - if store.contains(msg.index): trace "could not add item to store queue. Index already exists", index=msg.index return err("duplicate") - # TODO: the below delete block can be removed if we convert to circular buffer if store.items.len >= store.capacity: var @@ -342,10 +335,15 @@ proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[voi return ok() -method put*(store: StoreQueueRef, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = - let message = IndexedWakuMessage(msg: message, index: cursor, pubsubTopic: pubsubTopic) + +method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] = + let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) + let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) store.add(message) +method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] = + procCall MessageStore(store).put(pubsubTopic, message) + proc getPage*(storeQueue: StoreQueueRef, pred: QueryFilterMatcher, diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 3e35952b5..1efb3ac0c 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -49,6 +49,7 @@ const # Error types (metric label values) const + invalidMessage = "invalid_message" insertFailure = "insert_failure" retPolicyFailure = "retpolicy_failure" dialFailure = "dial_failure" @@ -204,6 +205,17 @@ proc init*(T: type WakuStore, WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy) +proc isValidMessage(msg: WakuMessage): bool = + if msg.timestamp == 0: + return true + + let + now = getNanosecondTime(getTime().toUnixFloat()) + lowerBound = now - MaxMessageTimestampVariance + upperBound = now + MaxMessageTimestampVariance + + return lowerBound <= msg.timestamp and msg.timestamp <= upperBound + proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) = if w.store.isNil(): # Messages should not be stored @@ -213,20 +225,27 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) = # The message is ephemeral, should not be stored return - let insertStartTime = getTime().toUnixFloat() - - let now = getNanosecondTime(getTime().toUnixFloat()) - let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) - - trace "handling message", topic=pubsubTopic, index=index - - # Add messages to persistent store, if present - let putStoreRes = w.store.put(index, msg, pubsubTopic) - if putStoreRes.isErr(): - debug "failed to insert message to persistent store", index=index, err=putStoreRes.error - waku_store_errors.inc(labelValues = [insertFailure]) + if not isValidMessage(msg): + waku_store_errors.inc(labelValues = [invalidMessage]) return + + let insertStartTime = getTime().toUnixFloat() + + block: + let + msgDigest = computeDigest(msg) + msgReceivedTime = if msg.timestamp > 0: msg.timestamp + else: getNanosecondTime(getTime().toUnixFloat()) + + trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest + + let putStoreRes = w.store.put(pubsubTopic, msg, msgDigest, msgReceivedTime) + if putStoreRes.isErr(): + debug "failed to insert message into the store", err=putStoreRes.error + waku_store_errors.inc(labelValues = [insertFailure]) + return + let insertDuration = getTime().toUnixFloat() - insertStartTime waku_store_insert_duration_seconds.observe(insertDuration) @@ -402,10 +421,7 @@ proc resume*(w: WakuStore, # Save the retrieved messages in the store var added: uint = 0 for msg in res.get(): - let now = getNanosecondTime(getTime().toUnixFloat()) - let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic) - - let putStoreRes = w.store.put(index, msg, pubsubTopic) + let putStoreRes = w.store.put(pubsubTopic, msg) if putStoreRes.isErr(): continue diff --git a/waku/v2/utils/pagination.nim b/waku/v2/utils/pagination.nim index 914bc147e..3d8a3f29b 100644 --- a/waku/v2/utils/pagination.nim +++ b/waku/v2/utils/pagination.nim @@ -8,6 +8,7 @@ import ../protocol/waku_message, ./time +type MessageDigest* = MDigest[256] const MaxPageSize*: uint64 = 100 @@ -20,9 +21,9 @@ type Index* = object pubsubTopic*: string senderTime*: Timestamp # the time at which the message is generated receiverTime*: Timestamp - digest*: MDigest[256] # calculated over payload and content topic + digest*: MessageDigest # calculated over payload and content topic -proc computeDigest*(msg: WakuMessage): MDigest[256] = +proc computeDigest*(msg: WakuMessage): MessageDigest = var ctx: sha256 ctx.init() defer: ctx.clear()