deploy: 1b4a9e5dfd1acd79760605016bd567ce4b79974a

This commit is contained in:
LNSD 2022-09-26 10:23:38 +00:00
parent 101bf6ab41
commit 01a9f1730c
13 changed files with 232 additions and 171 deletions

View File

@ -14,6 +14,7 @@ import
libp2p/protocols/pubsub/rpc/message
import
../../waku/v1/node/rpc/hexstrings,
../../waku/v2/node/storage/message/message_store,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/wakunode2,
../../waku/v2/node/jsonrpc/[store_api,
@ -258,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API":
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)]
for wakuMsg in msgList:
node.wakuStore.handleMessage(defaultTopic, wakuMsg)
require node.wakuStore.store.put(defaultTopic, wakuMsg).isOk()
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)

View File

@ -88,34 +88,6 @@ procSuite "Sorted store queue":
check:
store.len == capacity
test "Sender time can't be more than MaxTimeVariance in future":
## Given
let capacity = 5
let store = StoreQueueRef.new(capacity)
let
receiverTime = getNanoSecondTime(10)
senderTimeOk = receiverTime + StoreMaxTimeVariance
senderTimeErr = senderTimeOk + 1
let invalidMessage = IndexedWakuMessage(
msg: WakuMessage(
payload: @[byte 1],
timestamp: senderTimeErr
),
index: Index(
receiverTime: receiverTime,
senderTime: senderTimeErr
)
)
## When
let addRes = store.add(invalidMessage)
## Then
check:
addRes.isErr()
addRes.error() == "future_sender_timestamp"
test "Store queue sort-on-insert works":
## Given
let

View File

@ -25,13 +25,16 @@ const
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet()
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc getTestTimestamp(offset=0): Timestamp =
Timestamp(getNanosecondTime(epochTime()))
Timestamp(getNanosecondTime(getTime().toUnixFloat() + offset.float))
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
ts = getNanosecondTime(epochTime())
ts = now()
): WakuMessage =
WakuMessage(
payload: toBytes(payload),
@ -80,10 +83,9 @@ suite "SQLite message store - insert messages":
store = SqliteStore.init(database).tryGet()
let message = fakeWakuMessage(contentTopic=contentTopic)
let messageIndex = Index.compute(message, getNanosecondTime(epochTime()), DefaultPubsubTopic)
## When
let resPut = store.put(messageIndex, message, DefaultPubsubTopic)
let resPut = store.put(DefaultPubsubTopic, message)
## Then
check:
@ -123,8 +125,7 @@ suite "SQLite message store - insert messages":
## When
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
require retentionPolicy.execute(store).isOk()
## Then
@ -147,25 +148,24 @@ suite "Message Store":
let
database = newTestDatabase()
store = SqliteStore.init(database).get()
topic = DefaultContentTopic
pubsubTopic = DefaultPubsubTopic
let
t1 = getTestTimestamp(0)
t2 = getTestTimestamp(1)
t3 = high(int64)
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: DefaultContentTopic, version: uint32(1), timestamp: t2),
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3),
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3),
]
var indexes: seq[Index] = @[]
for msg in msgs:
var index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, pubsubTopic)
require resPut.isOk
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
indexes.add(index)
## When
@ -186,7 +186,10 @@ suite "Message Store":
# flags for receiver timestamp
var rt1Flag, rt2Flag, rt3Flag: bool = false
for (receiverTimestamp, msg, psTopic) in result:
for (receiverTimestamp, msg, pubsubTopic) in result:
check:
pubsubTopic == DefaultPubsubTopic
# check correct retrieval of receiver timestamps
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true
@ -205,9 +208,6 @@ suite "Message Store":
if msg.timestamp == t2: t2Flag = true
if msg.timestamp == t3: t3Flag = true
check:
psTopic == pubSubTopic
check:
# check version
v0Flag == true
@ -278,10 +278,8 @@ suite "Message Store":
for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic)
require store.put(index, msg, pubsubTopic).isOk()
let msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
require store.put(pubsubTopic, msg).isOk()
require retentionPolicy.execute(store).isOk()
## Then
@ -312,19 +310,12 @@ suite "Message Store":
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
for i in 1..capacity+overload:
let
msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic)
require store.put(index, msg, pubsubTopic).isOk()
let msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
require store.put(pubsubTopic, msg).isOk()
require retentionPolicy.execute(store).isOk()
# count messages in DB
var numMessages: int64
proc handler(s: ptr sqlite3_stmt) =
numMessages = sqlite3_column_int64(s, 0)
let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in sqlite_store
discard database.query(countQuery, handler)
let numMessages = store.getMessagesCount().tryGet()
check:
# expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)

View File

@ -59,8 +59,7 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -108,8 +107,7 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -159,8 +157,7 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -203,8 +200,7 @@ suite "message store - history query":
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
]
for msg in messages1:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
let messages2 = @[
@ -214,8 +210,7 @@ suite "message store - history query":
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages2:
let index = Index.compute(msg, msg.timestamp, pubsubTopic)
require store.put(index, msg, pubsubTopic).isOk()
require store.put(pubsubTopic, msg).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -264,8 +259,7 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
@ -316,8 +310,7 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
@ -363,8 +356,7 @@ suite "message store - history query":
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
]
for msg in messages1:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
let messages2 = @[
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
@ -372,8 +364,7 @@ suite "message store - history query":
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages2:
let index = Index.compute(msg, msg.timestamp, pubsubTopic)
require store.put(index, msg, pubsubTopic).isOk()
require store.put(pubsubTopic, msg).isOk()
let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
@ -420,8 +411,7 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
require store.put(DefaultPubsubTopic, msg).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -462,8 +452,8 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
let digest = computeDigest(msg)
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -474,6 +464,7 @@ suite "message store - history query":
ascendingOrder=true
)
## Then
check:
res.isOk()
@ -508,8 +499,8 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
let digest = computeDigest(msg)
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -550,8 +541,8 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
let digest = computeDigest(msg)
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
## When
let res = store.getMessagesByHistoryQuery(
@ -596,8 +587,8 @@ suite "message store - history query":
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
require store.put(index, msg, DefaultPubsubTopic).isOk()
let digest = computeDigest(msg)
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic)

View File

@ -26,6 +26,9 @@ const
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc now(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet()
@ -48,21 +51,11 @@ proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
return newStandardSwitch(some(peerKey), addrs=peerAddr)
proc newTestStore(): MessageStore =
let database = newTestDatabase()
SqliteStore.init(database).tryGet()
proc newTestWakuStore(switch: Switch): WakuStore =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
database = newTestDatabase()
store = SqliteStore.init(database).tryGet()
proto = WakuStore.init(peerManager, rng, store)
waitFor proto.start()
switch.mount(proto)
return proto
proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore =
proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore =
let
peerManager = PeerManager.new(switch)
rng = crypto.newRng()
@ -73,8 +66,7 @@ proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore =
return proto
suite "Waku Store":
suite "Waku Store - history query":
asyncTest "handle query":
## Setup
@ -335,7 +327,7 @@ suite "Waku Store":
]
for msg in msgList:
serverProto.handleMessage("foo", msg)
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
## When
let rpc = HistoryQuery(
@ -387,7 +379,7 @@ suite "Waku Store":
]
for msg in msgList:
serverProto.handleMessage("foo", msg)
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
## When
let rpc = HistoryQuery(
@ -439,7 +431,7 @@ suite "Waku Store":
]
for msg in msgList:
serverProto.handleMessage("foo", msg)
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
## When
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
@ -461,11 +453,35 @@ suite "Waku Store":
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle ephemeral messages":
suite "Waku Store - message handling":
asyncTest "it should store a valid and non-ephemeral message":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = newTestWakuStore(switch, store)
## Given
let validSenderTime = now()
let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 1
## Cleanup
await switch.stop()
asyncTest "it should not store an ephemeral message":
## Setup
let store = StoreQueueRef.new(10)
let switch = newTestSwitch()
let proto = newTestWakuStore(switch, store)
## Given
let msgList = @[
fakeWakuMessage(ephemeral = false, payload = "1"),
fakeWakuMessage(ephemeral = true, payload = "2"),
@ -474,15 +490,83 @@ suite "Waku Store":
fakeWakuMessage(ephemeral = false, payload = "5"),
]
## When
for msg in msgList:
proto.handleMessage(DefaultPubsubTopic, msg)
## Then
check:
store.len == 2
## Cleanup
await switch.stop()
asyncTest "it should store a message with no sender timestamp":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = newTestWakuStore(switch, store)
## Given
let invalidSenderTime = 0
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 1
## Cleanup
await switch.stop()
asyncTest "it should not store a message with a sender time variance greater than max time variance (future)":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = newTestWakuStore(switch, store)
## Given
let
now = getNanoSecondTime(getTime().toUnixFloat())
invalidSenderTime = now + MaxMessageTimestampVariance + 1
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 0
## Cleanup
await switch.stop()
asyncTest "it should not store a message with a sender time variance greater than max time variance (past)":
## Setup
let store = StoreQueueRef.new(5)
let switch = newTestSwitch()
let proto = newTestWakuStore(switch, store)
## Given
let
now = getNanoSecondTime(getTime().toUnixFloat())
invalidSenderTime = now - MaxMessageTimestampVariance - 1
let message = fakeWakuMessage(ts=invalidSenderTime)
## When
proto.handleMessage(DefaultPubSubTopic, message)
## Then
check:
store.getMessagesCount().tryGet() == 0
## Cleanup
await switch.stop()
# TODO: Review this test suite test cases
procSuite "Waku Store - fault tolerant store":
@ -529,7 +613,7 @@ procSuite "Waku Store - fault tolerant store":
]
for msg in msgList:
proto.handleMessage(DefaultPubsubTopic, msg)
require proto.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore()
let msgList2 = @[
@ -544,7 +628,7 @@ procSuite "Waku Store - fault tolerant store":
]
for msg in msgList2:
proto2.handleMessage(DefaultPubsubTopic, msg)
require proto2.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
asyncTest "handle temporal history query with a valid time window":

View File

@ -73,7 +73,7 @@ procSuite "WakuNode - Store":
## Given
let message = fakeWakuMessage()
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
## When
let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
@ -162,7 +162,7 @@ procSuite "WakuNode - Store":
## Given
let message = fakeWakuMessage()
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
## When
await client.resume()
@ -196,13 +196,15 @@ procSuite "WakuNode - Store":
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
server.wakuStore.handleMessage(DefaultTopic, msg1)
server.wakuStore.handleMessage(DefaultTopic, msg2)
require server.wakuStore.store.put(DefaultTopic, msg1).isOk()
require server.wakuStore.store.put(DefaultTopic, msg2).isOk()
# Insert the same message in both node's store
let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic)
require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
let
receivedTime3 = getNanosecondTime(getTime().toUnixFloat() + 10.float)
digest3 = computeDigest(msg3)
require server.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
require client.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
## When
await client.resume()

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az241-291:
# Libtool was configured on host fv-az421-295:
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
#
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -1,7 +1,7 @@
{.push raises: [Defect].}
import
std/options,
std/[options, times],
stew/results,
chronicles
import
@ -34,8 +34,8 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message
warn "failed to load messages from the persistent store", err = res.error
else:
for (receiverTime, msg, pubsubTopic) in res.value:
let index = Index.compute(msg, receiverTime, pubsubTopic)
discard inmemory.put(index, msg, pubsubTopic)
let digest = computeDigest(msg)
discard inmemory.put(pubsubTopic, msg, digest, receiverTime)
info "successfully loaded messages from the persistent store"
@ -43,11 +43,14 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message
return ok(DualMessageStore(inmemory: inmemory, persistent: persistent))
method put*(s: DualMessageStore, index: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
?s.inmemory.put(index, message, pubsubTopic)
?s.persistent.put(index, message, pubsubTopic)
method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
?s.inmemory.put(pubsubTopic, message, digest, receivedTime)
?s.persistent.put(pubsubTopic, message, digest, receivedTime)
ok()
method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
procCall MessageStore(s).put(pubsubTopic, message)
method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
s.inmemory.getAllMessages()

View File

@ -4,19 +4,14 @@
{.push raises: [Defect].}
import
std/options,
stew/results,
chronos
std/[options, times],
stew/results
import
../../../protocol/waku_message,
../../../utils/time,
../../../utils/pagination
# TODO: Remove this constant after moving time variance checks to waku store protocol
const StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
type
MessageStoreResult*[T] = Result[T, string]
@ -28,7 +23,15 @@ type
# MessageStore interface
method put*(ms: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
let
digest = computeDigest(message)
receivedTime = getNanosecondTime(getTime().toUnixFloat())
ms.put(pubsubTopic, message, digest, receivedTime)
method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard

View File

@ -3,10 +3,9 @@
{.push raises: [Defect].}
import
std/[options, tables, sequtils, algorithm],
std/[options, tables, sequtils, algorithm, times],
stew/[byteutils, results],
chronicles,
chronos
chronicles
import
../../../../protocol/waku_message,
../../../../utils/pagination,
@ -62,16 +61,12 @@ proc close*(s: SqliteStore) =
s.db.close()
method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
## Inserts a message into the store
# Ensure that messages don't "jump" to the front with future timestamps
if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance:
return err("future_sender_timestamp")
let res = s.insertStmt.exec((
@(cursor.digest.data), # id
cursor.receiverTime, # receiverTimestamp
@(digest.data), # id
receivedTime, # receiverTimestamp
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic
@ -83,6 +78,10 @@ method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: st
ok()
method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
## Inserts a message into the store
procCall MessageStore(s).put(pubsubTopic, message)
method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] =
## Retrieve all messages from the store.

View File

@ -1,7 +1,7 @@
{.push raises: [Defect].}
import
std/[options, algorithm],
std/[options, algorithm, times],
stew/[results, sorted_set],
chronicles
import
@ -313,17 +313,10 @@ proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[voi
## Add a message to the queue
##
## If we're at capacity, we will be removing, the oldest (first) item
trace "adding item to store queue", msg=msg
# Ensure that messages don't "jump" to the front of the queue with future timestamps
if msg.index.senderTime - msg.index.receiverTime > StoreMaxTimeVariance:
return err("future_sender_timestamp")
if store.contains(msg.index):
trace "could not add item to store queue. Index already exists", index=msg.index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if store.items.len >= store.capacity:
var
@ -342,10 +335,15 @@ proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[voi
return ok()
method put*(store: StoreQueueRef, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
let message = IndexedWakuMessage(msg: message, index: cursor, pubsubTopic: pubsubTopic)
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
store.add(message)
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
procCall MessageStore(store).put(pubsubTopic, message)
proc getPage*(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,

View File

@ -49,6 +49,7 @@ const
# Error types (metric label values)
const
invalidMessage = "invalid_message"
insertFailure = "insert_failure"
retPolicyFailure = "retpolicy_failure"
dialFailure = "dial_failure"
@ -204,6 +205,17 @@ proc init*(T: type WakuStore,
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)
proc isValidMessage(msg: WakuMessage): bool =
if msg.timestamp == 0:
return true
let
now = getNanosecondTime(getTime().toUnixFloat())
lowerBound = now - MaxMessageTimestampVariance
upperBound = now + MaxMessageTimestampVariance
return lowerBound <= msg.timestamp and msg.timestamp <= upperBound
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
if w.store.isNil():
# Messages should not be stored
@ -213,17 +225,24 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
# The message is ephemeral, should not be stored
return
if not isValidMessage(msg):
waku_store_errors.inc(labelValues = [invalidMessage])
return
let insertStartTime = getTime().toUnixFloat()
let now = getNanosecondTime(getTime().toUnixFloat())
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
block:
let
msgDigest = computeDigest(msg)
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
trace "handling message", topic=pubsubTopic, index=index
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
# Add messages to persistent store, if present
let putStoreRes = w.store.put(index, msg, pubsubTopic)
let putStoreRes = w.store.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
if putStoreRes.isErr():
debug "failed to insert message to persistent store", index=index, err=putStoreRes.error
debug "failed to insert message into the store", err=putStoreRes.error
waku_store_errors.inc(labelValues = [insertFailure])
return
@ -402,10 +421,7 @@ proc resume*(w: WakuStore,
# Save the retrieved messages in the store
var added: uint = 0
for msg in res.get():
let now = getNanosecondTime(getTime().toUnixFloat())
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
let putStoreRes = w.store.put(index, msg, pubsubTopic)
let putStoreRes = w.store.put(pubsubTopic, msg)
if putStoreRes.isErr():
continue

View File

@ -8,6 +8,7 @@ import
../protocol/waku_message,
./time
type MessageDigest* = MDigest[256]
const
MaxPageSize*: uint64 = 100
@ -20,9 +21,9 @@ type Index* = object
pubsubTopic*: string
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MDigest[256] # calculated over payload and content topic
digest*: MessageDigest # calculated over payload and content topic
proc computeDigest*(msg: WakuMessage): MDigest[256] =
proc computeDigest*(msg: WakuMessage): MessageDigest =
var ctx: sha256
ctx.init()
defer: ctx.clear()