diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 057c1a8a5..deb296233 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -11,6 +11,7 @@ import import ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, + ../../waku/v2/node/storage/message/message_store, ../../waku/v2/node/storage/message/waku_store_queue, ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/peer_manager/peer_manager, @@ -30,13 +31,15 @@ proc newTestDatabase(): SqliteDatabase = proc fakeWakuMessage( payload = "TEST-PAYLOAD", contentTopic = DefaultContentTopic, - ts = getNanosecondTime(epochTime()) + ts = getNanosecondTime(epochTime()), + ephemeral = false, ): WakuMessage = WakuMessage( payload: toBytes(payload), contentTopic: contentTopic, version: 1, - timestamp: ts + timestamp: ts, + ephemeral: ephemeral, ) proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch = @@ -58,6 +61,17 @@ proc newTestWakuStore(switch: Switch): WakuStore = return proto +proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuStore.init(peerManager, rng, store) + + waitFor proto.start() + switch.mount(proto) + + return proto + suite "Waku Store": @@ -446,6 +460,28 @@ suite "Waku Store": ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) + asyncTest "handle ephemeral messages": + ## Setup + let store = StoreQueueRef.new(10) + let switch = newTestSwitch() + let proto = newTestWakuStore(switch, store) + let msgList = @[ + fakeWakuMessage(ephemeral = false, payload = "1"), + fakeWakuMessage(ephemeral = true, payload = "2"), + fakeWakuMessage(ephemeral = true, payload = "3"), + fakeWakuMessage(ephemeral = true, payload = "4"), + fakeWakuMessage(ephemeral = false, payload = "5"), + ] + + for msg in msgList: + await proto.handleMessage(DefaultPubsubTopic, msg) + + check: + store.len == 2 + + ## Cleanup + await switch.stop() + # TODO: Review this test suite test cases procSuite "Waku Store - fault tolerant store": diff --git a/waku/v2/protocol/waku_message.nim b/waku/v2/protocol/waku_message.nim index 6bb424202..88799c27a 100644 --- a/waku/v2/protocol/waku_message.nim +++ b/waku/v2/protocol/waku_message.nim @@ -31,12 +31,16 @@ type # this field will be used in the rln-relay protocol # XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec proof*: RateLimitProof + # The ephemeral field indicates if the message should + # be stored. bools and uints are + # equivalent in serialization of the protobuf + ephemeral*: bool # Encoding and decoding ------------------------------------------------------- proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] = - var msg = WakuMessage() + var msg = WakuMessage(ephemeral: false) let pb = initProtoBuffer(buffer) discard ? pb.getField(1, msg.payload) @@ -51,7 +55,15 @@ proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] = var proofBytes: seq[byte] discard ? pb.getField(21, proofBytes) msg.proof = ? RateLimitProof.init(proofBytes) - + + # Behaviour of ephemeral with storeTTL to be defined, + # If a message is marked ephemeral, it should not have a storeTTL. + # If a message is not marked ephemeral, it should have a storeTTL. + # How would we handle messages that should be stored permanently? + var ephemeral: uint + if ? pb.getField(31, ephemeral): + msg.ephemeral = bool(ephemeral) + ok(msg) proc encode*(message: WakuMessage): ProtoBuffer = @@ -62,6 +74,6 @@ proc encode*(message: WakuMessage): ProtoBuffer = result.write3(3, message.version) result.write3(10, zint64(message.timestamp)) result.write3(21, message.proof.encode()) - + result.write3(31, uint64(message.ephemeral)) result.finish3() diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 9f945a37d..d22118a95 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -204,6 +204,10 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = # Store is mounted but new messages should not be stored return + if msg.ephemeral: + # The message is ephemeral, should not be stored + return + let index = Index.compute( msg, receivedTime = getNanosecondTime(getTime().toUnixFloat()), diff --git a/waku/v2/utils/protobuf.nim b/waku/v2/utils/protobuf.nim index 75fc73a77..0e5c86f22 100644 --- a/waku/v2/utils/protobuf.nim +++ b/waku/v2/utils/protobuf.nim @@ -17,4 +17,4 @@ proc finish3*(proto: var ProtoBuffer) = proto.offset = 0 proc `==`*(a: zint64, b: zint64): bool = - int64(a) == int64(b) \ No newline at end of file + int64(a) == int64(b)