mirror of https://github.com/waku-org/nwaku.git
feat(store): Allow messages to be marked as "don't store" (#1119)
* feat(store): init allow messages to be marked as ephemeral * feat(store): replace ephemeral with ttl model * Revert "feat(store): replace ephemeral with ttl model" This reverts commit 4398c61c919dda8f014b4fcc0679c573c74854e4. * fix(store): flags and test * chore(store): remove storeTTL * chore(store): remove unused utils * fix: conflict * fix(store): make test more brief
This commit is contained in:
parent
4745c7872c
commit
fdcc9824a2
|
@ -11,6 +11,7 @@ import
|
|||
import
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/node/storage/message/message_store,
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/node/storage/message/waku_message_store,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
|
@ -30,13 +31,15 @@ proc newTestDatabase(): SqliteDatabase =
|
|||
proc fakeWakuMessage(
|
||||
payload = "TEST-PAYLOAD",
|
||||
contentTopic = DefaultContentTopic,
|
||||
ts = getNanosecondTime(epochTime())
|
||||
ts = getNanosecondTime(epochTime()),
|
||||
ephemeral = false,
|
||||
): WakuMessage =
|
||||
WakuMessage(
|
||||
payload: toBytes(payload),
|
||||
contentTopic: contentTopic,
|
||||
version: 1,
|
||||
timestamp: ts
|
||||
timestamp: ts,
|
||||
ephemeral: ephemeral,
|
||||
)
|
||||
|
||||
proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
|
||||
|
@ -58,6 +61,17 @@ proc newTestWakuStore(switch: Switch): WakuStore =
|
|||
|
||||
return proto
|
||||
|
||||
proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
rng = crypto.newRng()
|
||||
proto = WakuStore.init(peerManager, rng, store)
|
||||
|
||||
waitFor proto.start()
|
||||
switch.mount(proto)
|
||||
|
||||
return proto
|
||||
|
||||
|
||||
suite "Waku Store":
|
||||
|
||||
|
@ -446,6 +460,28 @@ suite "Waku Store":
|
|||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "handle ephemeral messages":
|
||||
## Setup
|
||||
let store = StoreQueueRef.new(10)
|
||||
let switch = newTestSwitch()
|
||||
let proto = newTestWakuStore(switch, store)
|
||||
let msgList = @[
|
||||
fakeWakuMessage(ephemeral = false, payload = "1"),
|
||||
fakeWakuMessage(ephemeral = true, payload = "2"),
|
||||
fakeWakuMessage(ephemeral = true, payload = "3"),
|
||||
fakeWakuMessage(ephemeral = true, payload = "4"),
|
||||
fakeWakuMessage(ephemeral = false, payload = "5"),
|
||||
]
|
||||
|
||||
for msg in msgList:
|
||||
await proto.handleMessage(DefaultPubsubTopic, msg)
|
||||
|
||||
check:
|
||||
store.len == 2
|
||||
|
||||
## Cleanup
|
||||
await switch.stop()
|
||||
|
||||
|
||||
# TODO: Review this test suite test cases
|
||||
procSuite "Waku Store - fault tolerant store":
|
||||
|
|
|
@ -31,12 +31,16 @@ type
|
|||
# this field will be used in the rln-relay protocol
|
||||
# XXX Experimental, this is part of https://rfc.vac.dev/spec/17/ spec and not yet part of WakuMessage spec
|
||||
proof*: RateLimitProof
|
||||
# The ephemeral field indicates if the message should
|
||||
# be stored. bools and uints are
|
||||
# equivalent in serialization of the protobuf
|
||||
ephemeral*: bool
|
||||
|
||||
|
||||
|
||||
# Encoding and decoding -------------------------------------------------------
|
||||
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
|
||||
var msg = WakuMessage()
|
||||
var msg = WakuMessage(ephemeral: false)
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
discard ? pb.getField(1, msg.payload)
|
||||
|
@ -51,7 +55,15 @@ proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
|
|||
var proofBytes: seq[byte]
|
||||
discard ? pb.getField(21, proofBytes)
|
||||
msg.proof = ? RateLimitProof.init(proofBytes)
|
||||
|
||||
|
||||
# Behaviour of ephemeral with storeTTL to be defined,
|
||||
# If a message is marked ephemeral, it should not have a storeTTL.
|
||||
# If a message is not marked ephemeral, it should have a storeTTL.
|
||||
# How would we handle messages that should be stored permanently?
|
||||
var ephemeral: uint
|
||||
if ? pb.getField(31, ephemeral):
|
||||
msg.ephemeral = bool(ephemeral)
|
||||
|
||||
ok(msg)
|
||||
|
||||
proc encode*(message: WakuMessage): ProtoBuffer =
|
||||
|
@ -62,6 +74,6 @@ proc encode*(message: WakuMessage): ProtoBuffer =
|
|||
result.write3(3, message.version)
|
||||
result.write3(10, zint64(message.timestamp))
|
||||
result.write3(21, message.proof.encode())
|
||||
|
||||
result.write3(31, uint64(message.ephemeral))
|
||||
result.finish3()
|
||||
|
||||
|
|
|
@ -204,6 +204,10 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
|||
# Store is mounted but new messages should not be stored
|
||||
return
|
||||
|
||||
if msg.ephemeral:
|
||||
# The message is ephemeral, should not be stored
|
||||
return
|
||||
|
||||
let index = Index.compute(
|
||||
msg,
|
||||
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
|
||||
|
|
|
@ -17,4 +17,4 @@ proc finish3*(proto: var ProtoBuffer) =
|
|||
proto.offset = 0
|
||||
|
||||
proc `==`*(a: zint64, b: zint64): bool =
|
||||
int64(a) == int64(b)
|
||||
int64(a) == int64(b)
|
||||
|
|
Loading…
Reference in New Issue