mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 00:13:06 +00:00
fix(store): move insert criteria logic to waku store protocol module
This commit is contained in:
parent
489ca1b01e
commit
1b4a9e5dfd
@ -14,6 +14,7 @@ import
|
|||||||
libp2p/protocols/pubsub/rpc/message
|
libp2p/protocols/pubsub/rpc/message
|
||||||
import
|
import
|
||||||
../../waku/v1/node/rpc/hexstrings,
|
../../waku/v1/node/rpc/hexstrings,
|
||||||
|
../../waku/v2/node/storage/message/message_store,
|
||||||
../../waku/v2/node/storage/message/waku_store_queue,
|
../../waku/v2/node/storage/message/waku_store_queue,
|
||||||
../../waku/v2/node/wakunode2,
|
../../waku/v2/node/wakunode2,
|
||||||
../../waku/v2/node/jsonrpc/[store_api,
|
../../waku/v2/node/jsonrpc/[store_api,
|
||||||
@ -258,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API":
|
|||||||
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)]
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)]
|
||||||
|
|
||||||
for wakuMsg in msgList:
|
for wakuMsg in msgList:
|
||||||
node.wakuStore.handleMessage(defaultTopic, wakuMsg)
|
require node.wakuStore.store.put(defaultTopic, wakuMsg).isOk()
|
||||||
|
|
||||||
let client = newRpcHttpClient()
|
let client = newRpcHttpClient()
|
||||||
await client.connect("127.0.0.1", rpcPort, false)
|
await client.connect("127.0.0.1", rpcPort, false)
|
||||||
|
|||||||
@ -88,34 +88,6 @@ procSuite "Sorted store queue":
|
|||||||
check:
|
check:
|
||||||
store.len == capacity
|
store.len == capacity
|
||||||
|
|
||||||
test "Sender time can't be more than MaxTimeVariance in future":
|
|
||||||
## Given
|
|
||||||
let capacity = 5
|
|
||||||
let store = StoreQueueRef.new(capacity)
|
|
||||||
let
|
|
||||||
receiverTime = getNanoSecondTime(10)
|
|
||||||
senderTimeOk = receiverTime + StoreMaxTimeVariance
|
|
||||||
senderTimeErr = senderTimeOk + 1
|
|
||||||
|
|
||||||
let invalidMessage = IndexedWakuMessage(
|
|
||||||
msg: WakuMessage(
|
|
||||||
payload: @[byte 1],
|
|
||||||
timestamp: senderTimeErr
|
|
||||||
),
|
|
||||||
index: Index(
|
|
||||||
receiverTime: receiverTime,
|
|
||||||
senderTime: senderTimeErr
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
## When
|
|
||||||
let addRes = store.add(invalidMessage)
|
|
||||||
|
|
||||||
## Then
|
|
||||||
check:
|
|
||||||
addRes.isErr()
|
|
||||||
addRes.error() == "future_sender_timestamp"
|
|
||||||
|
|
||||||
test "Store queue sort-on-insert works":
|
test "Store queue sort-on-insert works":
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
|
|||||||
@ -25,13 +25,16 @@ const
|
|||||||
proc newTestDatabase(): SqliteDatabase =
|
proc newTestDatabase(): SqliteDatabase =
|
||||||
SqliteDatabase.init("", inMemory = true).tryGet()
|
SqliteDatabase.init("", inMemory = true).tryGet()
|
||||||
|
|
||||||
|
proc now(): Timestamp =
|
||||||
|
getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
proc getTestTimestamp(offset=0): Timestamp =
|
proc getTestTimestamp(offset=0): Timestamp =
|
||||||
Timestamp(getNanosecondTime(epochTime()))
|
Timestamp(getNanosecondTime(getTime().toUnixFloat() + offset.float))
|
||||||
|
|
||||||
proc fakeWakuMessage(
|
proc fakeWakuMessage(
|
||||||
payload = "TEST-PAYLOAD",
|
payload = "TEST-PAYLOAD",
|
||||||
contentTopic = DefaultContentTopic,
|
contentTopic = DefaultContentTopic,
|
||||||
ts = getNanosecondTime(epochTime())
|
ts = now()
|
||||||
): WakuMessage =
|
): WakuMessage =
|
||||||
WakuMessage(
|
WakuMessage(
|
||||||
payload: toBytes(payload),
|
payload: toBytes(payload),
|
||||||
@ -80,10 +83,9 @@ suite "SQLite message store - insert messages":
|
|||||||
store = SqliteStore.init(database).tryGet()
|
store = SqliteStore.init(database).tryGet()
|
||||||
|
|
||||||
let message = fakeWakuMessage(contentTopic=contentTopic)
|
let message = fakeWakuMessage(contentTopic=contentTopic)
|
||||||
let messageIndex = Index.compute(message, getNanosecondTime(epochTime()), DefaultPubsubTopic)
|
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let resPut = store.put(messageIndex, message, DefaultPubsubTopic)
|
let resPut = store.put(DefaultPubsubTopic, message)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
@ -123,8 +125,7 @@ suite "SQLite message store - insert messages":
|
|||||||
|
|
||||||
## When
|
## When
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
require retentionPolicy.execute(store).isOk()
|
require retentionPolicy.execute(store).isOk()
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
@ -147,25 +148,24 @@ suite "Message Store":
|
|||||||
let
|
let
|
||||||
database = newTestDatabase()
|
database = newTestDatabase()
|
||||||
store = SqliteStore.init(database).get()
|
store = SqliteStore.init(database).get()
|
||||||
topic = DefaultContentTopic
|
|
||||||
pubsubTopic = DefaultPubsubTopic
|
|
||||||
|
|
||||||
|
let
|
||||||
t1 = getTestTimestamp(0)
|
t1 = getTestTimestamp(0)
|
||||||
t2 = getTestTimestamp(1)
|
t2 = getTestTimestamp(1)
|
||||||
t3 = high(int64)
|
t3 = high(int64)
|
||||||
|
|
||||||
var msgs = @[
|
var msgs = @[
|
||||||
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
|
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic, version: uint32(0), timestamp: t1),
|
||||||
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
|
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: DefaultContentTopic, version: uint32(1), timestamp: t2),
|
||||||
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
|
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
|
||||||
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3),
|
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3),
|
||||||
]
|
]
|
||||||
|
|
||||||
var indexes: seq[Index] = @[]
|
var indexes: seq[Index] = @[]
|
||||||
for msg in msgs:
|
for msg in msgs:
|
||||||
var index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||||
let resPut = store.put(index, msg, pubsubTopic)
|
|
||||||
require resPut.isOk
|
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
||||||
indexes.add(index)
|
indexes.add(index)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
@ -186,7 +186,10 @@ suite "Message Store":
|
|||||||
# flags for receiver timestamp
|
# flags for receiver timestamp
|
||||||
var rt1Flag, rt2Flag, rt3Flag: bool = false
|
var rt1Flag, rt2Flag, rt3Flag: bool = false
|
||||||
|
|
||||||
for (receiverTimestamp, msg, psTopic) in result:
|
for (receiverTimestamp, msg, pubsubTopic) in result:
|
||||||
|
check:
|
||||||
|
pubsubTopic == DefaultPubsubTopic
|
||||||
|
|
||||||
# check correct retrieval of receiver timestamps
|
# check correct retrieval of receiver timestamps
|
||||||
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true
|
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true
|
||||||
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true
|
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true
|
||||||
@ -204,9 +207,6 @@ suite "Message Store":
|
|||||||
if msg.timestamp == t1: t1Flag = true
|
if msg.timestamp == t1: t1Flag = true
|
||||||
if msg.timestamp == t2: t2Flag = true
|
if msg.timestamp == t2: t2Flag = true
|
||||||
if msg.timestamp == t3: t3Flag = true
|
if msg.timestamp == t3: t3Flag = true
|
||||||
|
|
||||||
check:
|
|
||||||
psTopic == pubSubTopic
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# check version
|
# check version
|
||||||
@ -278,10 +278,8 @@ suite "Message Store":
|
|||||||
|
|
||||||
|
|
||||||
for i in 1..capacity:
|
for i in 1..capacity:
|
||||||
let
|
let msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
||||||
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
require store.put(pubsubTopic, msg).isOk()
|
||||||
index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic)
|
|
||||||
require store.put(index, msg, pubsubTopic).isOk()
|
|
||||||
require retentionPolicy.execute(store).isOk()
|
require retentionPolicy.execute(store).isOk()
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
@ -312,19 +310,12 @@ suite "Message Store":
|
|||||||
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
|
retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
|
||||||
|
|
||||||
for i in 1..capacity+overload:
|
for i in 1..capacity+overload:
|
||||||
let
|
let msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
||||||
msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
require store.put(pubsubTopic, msg).isOk()
|
||||||
index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic)
|
|
||||||
require store.put(index, msg, pubsubTopic).isOk()
|
|
||||||
require retentionPolicy.execute(store).isOk()
|
require retentionPolicy.execute(store).isOk()
|
||||||
|
|
||||||
# count messages in DB
|
# count messages in DB
|
||||||
var numMessages: int64
|
let numMessages = store.getMessagesCount().tryGet()
|
||||||
proc handler(s: ptr sqlite3_stmt) =
|
|
||||||
numMessages = sqlite3_column_int64(s, 0)
|
|
||||||
let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in sqlite_store
|
|
||||||
discard database.query(countQuery, handler)
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
# expected number of messages is 120 because
|
# expected number of messages is 120 because
|
||||||
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
|
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
|
||||||
|
|||||||
@ -59,8 +59,7 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -108,8 +107,7 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -159,8 +157,7 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -203,8 +200,7 @@ suite "message store - history query":
|
|||||||
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
|
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
|
||||||
]
|
]
|
||||||
for msg in messages1:
|
for msg in messages1:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
|
|
||||||
let messages2 = @[
|
let messages2 = @[
|
||||||
@ -214,8 +210,7 @@ suite "message store - history query":
|
|||||||
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
|
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
|
||||||
]
|
]
|
||||||
for msg in messages2:
|
for msg in messages2:
|
||||||
let index = Index.compute(msg, msg.timestamp, pubsubTopic)
|
require store.put(pubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, pubsubTopic).isOk()
|
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -264,8 +259,7 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
|
let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
|
||||||
|
|
||||||
@ -316,8 +310,7 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
|
let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
|
||||||
|
|
||||||
@ -363,8 +356,7 @@ suite "message store - history query":
|
|||||||
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
|
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
|
||||||
]
|
]
|
||||||
for msg in messages1:
|
for msg in messages1:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
let messages2 = @[
|
let messages2 = @[
|
||||||
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
|
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
|
||||||
@ -372,8 +364,7 @@ suite "message store - history query":
|
|||||||
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
|
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
|
||||||
]
|
]
|
||||||
for msg in messages2:
|
for msg in messages2:
|
||||||
let index = Index.compute(msg, msg.timestamp, pubsubTopic)
|
require store.put(pubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, pubsubTopic).isOk()
|
|
||||||
|
|
||||||
let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
|
let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
|
||||||
|
|
||||||
@ -420,8 +411,7 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
require store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -462,8 +452,8 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
let digest = computeDigest(msg)
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -474,6 +464,7 @@ suite "message store - history query":
|
|||||||
ascendingOrder=true
|
ascendingOrder=true
|
||||||
)
|
)
|
||||||
|
|
||||||
|
## Then
|
||||||
check:
|
check:
|
||||||
res.isOk()
|
res.isOk()
|
||||||
|
|
||||||
@ -508,8 +499,8 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
let digest = computeDigest(msg)
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -550,8 +541,8 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
let digest = computeDigest(msg)
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let res = store.getMessagesByHistoryQuery(
|
let res = store.getMessagesByHistoryQuery(
|
||||||
@ -596,8 +587,8 @@ suite "message store - history query":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
|
let digest = computeDigest(msg)
|
||||||
require store.put(index, msg, DefaultPubsubTopic).isOk()
|
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||||
|
|
||||||
let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic)
|
let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic)
|
||||||
|
|
||||||
|
|||||||
@ -26,6 +26,9 @@ const
|
|||||||
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
|
||||||
|
|
||||||
|
proc now(): Timestamp =
|
||||||
|
getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
proc newTestDatabase(): SqliteDatabase =
|
proc newTestDatabase(): SqliteDatabase =
|
||||||
SqliteDatabase.init("", inMemory = true).tryGet()
|
SqliteDatabase.init("", inMemory = true).tryGet()
|
||||||
|
|
||||||
@ -48,21 +51,11 @@ proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
|
|||||||
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
|
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
|
||||||
return newStandardSwitch(some(peerKey), addrs=peerAddr)
|
return newStandardSwitch(some(peerKey), addrs=peerAddr)
|
||||||
|
|
||||||
|
proc newTestStore(): MessageStore =
|
||||||
|
let database = newTestDatabase()
|
||||||
|
SqliteStore.init(database).tryGet()
|
||||||
|
|
||||||
proc newTestWakuStore(switch: Switch): WakuStore =
|
proc newTestWakuStore(switch: Switch, store=newTestStore()): WakuStore =
|
||||||
let
|
|
||||||
peerManager = PeerManager.new(switch)
|
|
||||||
rng = crypto.newRng()
|
|
||||||
database = newTestDatabase()
|
|
||||||
store = SqliteStore.init(database).tryGet()
|
|
||||||
proto = WakuStore.init(peerManager, rng, store)
|
|
||||||
|
|
||||||
waitFor proto.start()
|
|
||||||
switch.mount(proto)
|
|
||||||
|
|
||||||
return proto
|
|
||||||
|
|
||||||
proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore =
|
|
||||||
let
|
let
|
||||||
peerManager = PeerManager.new(switch)
|
peerManager = PeerManager.new(switch)
|
||||||
rng = crypto.newRng()
|
rng = crypto.newRng()
|
||||||
@ -73,8 +66,7 @@ proc newTestWakuStore(switch: Switch, store: MessageStore): WakuStore =
|
|||||||
|
|
||||||
return proto
|
return proto
|
||||||
|
|
||||||
|
suite "Waku Store - history query":
|
||||||
suite "Waku Store":
|
|
||||||
|
|
||||||
asyncTest "handle query":
|
asyncTest "handle query":
|
||||||
## Setup
|
## Setup
|
||||||
@ -335,7 +327,7 @@ suite "Waku Store":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
serverProto.handleMessage("foo", msg)
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(
|
let rpc = HistoryQuery(
|
||||||
@ -387,7 +379,7 @@ suite "Waku Store":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
serverProto.handleMessage("foo", msg)
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(
|
let rpc = HistoryQuery(
|
||||||
@ -439,7 +431,7 @@ suite "Waku Store":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
serverProto.handleMessage("foo", msg)
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
|
||||||
@ -461,11 +453,35 @@ suite "Waku Store":
|
|||||||
## Cleanup
|
## Cleanup
|
||||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||||
|
|
||||||
asyncTest "handle ephemeral messages":
|
suite "Waku Store - message handling":
|
||||||
|
|
||||||
|
asyncTest "it should store a valid and non-ephemeral message":
|
||||||
|
## Setup
|
||||||
|
let store = StoreQueueRef.new(5)
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let proto = newTestWakuStore(switch, store)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let validSenderTime = now()
|
||||||
|
let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime)
|
||||||
|
|
||||||
|
## When
|
||||||
|
proto.handleMessage(DefaultPubSubTopic, message)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
store.getMessagesCount().tryGet() == 1
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await switch.stop()
|
||||||
|
|
||||||
|
asyncTest "it should not store an ephemeral message":
|
||||||
## Setup
|
## Setup
|
||||||
let store = StoreQueueRef.new(10)
|
let store = StoreQueueRef.new(10)
|
||||||
let switch = newTestSwitch()
|
let switch = newTestSwitch()
|
||||||
let proto = newTestWakuStore(switch, store)
|
let proto = newTestWakuStore(switch, store)
|
||||||
|
|
||||||
|
## Given
|
||||||
let msgList = @[
|
let msgList = @[
|
||||||
fakeWakuMessage(ephemeral = false, payload = "1"),
|
fakeWakuMessage(ephemeral = false, payload = "1"),
|
||||||
fakeWakuMessage(ephemeral = true, payload = "2"),
|
fakeWakuMessage(ephemeral = true, payload = "2"),
|
||||||
@ -474,15 +490,83 @@ suite "Waku Store":
|
|||||||
fakeWakuMessage(ephemeral = false, payload = "5"),
|
fakeWakuMessage(ephemeral = false, payload = "5"),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
## When
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
proto.handleMessage(DefaultPubsubTopic, msg)
|
proto.handleMessage(DefaultPubsubTopic, msg)
|
||||||
|
|
||||||
|
## Then
|
||||||
check:
|
check:
|
||||||
store.len == 2
|
store.len == 2
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
await switch.stop()
|
await switch.stop()
|
||||||
|
|
||||||
|
asyncTest "it should store a message with no sender timestamp":
|
||||||
|
## Setup
|
||||||
|
let store = StoreQueueRef.new(5)
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let proto = newTestWakuStore(switch, store)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let invalidSenderTime = 0
|
||||||
|
let message = fakeWakuMessage(ts=invalidSenderTime)
|
||||||
|
|
||||||
|
## When
|
||||||
|
proto.handleMessage(DefaultPubSubTopic, message)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
store.getMessagesCount().tryGet() == 1
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await switch.stop()
|
||||||
|
|
||||||
|
asyncTest "it should not store a message with a sender time variance greater than max time variance (future)":
|
||||||
|
## Setup
|
||||||
|
let store = StoreQueueRef.new(5)
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let proto = newTestWakuStore(switch, store)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let
|
||||||
|
now = getNanoSecondTime(getTime().toUnixFloat())
|
||||||
|
invalidSenderTime = now + MaxMessageTimestampVariance + 1
|
||||||
|
|
||||||
|
let message = fakeWakuMessage(ts=invalidSenderTime)
|
||||||
|
|
||||||
|
## When
|
||||||
|
proto.handleMessage(DefaultPubSubTopic, message)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
store.getMessagesCount().tryGet() == 0
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await switch.stop()
|
||||||
|
|
||||||
|
asyncTest "it should not store a message with a sender time variance greater than max time variance (past)":
|
||||||
|
## Setup
|
||||||
|
let store = StoreQueueRef.new(5)
|
||||||
|
let switch = newTestSwitch()
|
||||||
|
let proto = newTestWakuStore(switch, store)
|
||||||
|
|
||||||
|
## Given
|
||||||
|
let
|
||||||
|
now = getNanoSecondTime(getTime().toUnixFloat())
|
||||||
|
invalidSenderTime = now - MaxMessageTimestampVariance - 1
|
||||||
|
|
||||||
|
let message = fakeWakuMessage(ts=invalidSenderTime)
|
||||||
|
|
||||||
|
## When
|
||||||
|
proto.handleMessage(DefaultPubSubTopic, message)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
store.getMessagesCount().tryGet() == 0
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
await switch.stop()
|
||||||
|
|
||||||
|
|
||||||
# TODO: Review this test suite test cases
|
# TODO: Review this test suite test cases
|
||||||
procSuite "Waku Store - fault tolerant store":
|
procSuite "Waku Store - fault tolerant store":
|
||||||
@ -529,7 +613,7 @@ procSuite "Waku Store - fault tolerant store":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
proto.handleMessage(DefaultPubsubTopic, msg)
|
require proto.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||||
|
|
||||||
let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore()
|
let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore()
|
||||||
let msgList2 = @[
|
let msgList2 = @[
|
||||||
@ -544,7 +628,7 @@ procSuite "Waku Store - fault tolerant store":
|
|||||||
]
|
]
|
||||||
|
|
||||||
for msg in msgList2:
|
for msg in msgList2:
|
||||||
proto2.handleMessage(DefaultPubsubTopic, msg)
|
require proto2.store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||||
|
|
||||||
|
|
||||||
asyncTest "handle temporal history query with a valid time window":
|
asyncTest "handle temporal history query with a valid time window":
|
||||||
|
|||||||
@ -73,7 +73,7 @@ procSuite "WakuNode - Store":
|
|||||||
|
|
||||||
## Given
|
## Given
|
||||||
let message = fakeWakuMessage()
|
let message = fakeWakuMessage()
|
||||||
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
|
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
|
let req = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
|
||||||
@ -162,7 +162,7 @@ procSuite "WakuNode - Store":
|
|||||||
|
|
||||||
## Given
|
## Given
|
||||||
let message = fakeWakuMessage()
|
let message = fakeWakuMessage()
|
||||||
server.wakuStore.handleMessage(DefaultPubsubTopic, message)
|
require server.wakuStore.store.put(DefaultPubsubTopic, message).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
await client.resume()
|
await client.resume()
|
||||||
@ -196,13 +196,15 @@ procSuite "WakuNode - Store":
|
|||||||
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
|
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
|
||||||
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
|
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
|
||||||
|
|
||||||
server.wakuStore.handleMessage(DefaultTopic, msg1)
|
require server.wakuStore.store.put(DefaultTopic, msg1).isOk()
|
||||||
server.wakuStore.handleMessage(DefaultTopic, msg2)
|
require server.wakuStore.store.put(DefaultTopic, msg2).isOk()
|
||||||
|
|
||||||
# Insert the same message in both node's store
|
# Insert the same message in both node's store
|
||||||
let index3 = Index.compute(msg3, getNanosecondTime(getTime().toUnixFloat() + 10.float), DefaultTopic)
|
let
|
||||||
require server.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
|
receivedTime3 = getNanosecondTime(getTime().toUnixFloat() + 10.float)
|
||||||
require client.wakuStore.store.put(index3, msg3, DefaultTopic).isOk()
|
digest3 = computeDigest(msg3)
|
||||||
|
require server.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
|
||||||
|
require client.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
|
||||||
|
|
||||||
## When
|
## When
|
||||||
await client.resume()
|
await client.resume()
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/options,
|
std/[options, times],
|
||||||
stew/results,
|
stew/results,
|
||||||
chronicles
|
chronicles
|
||||||
import
|
import
|
||||||
@ -34,8 +34,8 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message
|
|||||||
warn "failed to load messages from the persistent store", err = res.error
|
warn "failed to load messages from the persistent store", err = res.error
|
||||||
else:
|
else:
|
||||||
for (receiverTime, msg, pubsubTopic) in res.value:
|
for (receiverTime, msg, pubsubTopic) in res.value:
|
||||||
let index = Index.compute(msg, receiverTime, pubsubTopic)
|
let digest = computeDigest(msg)
|
||||||
discard inmemory.put(index, msg, pubsubTopic)
|
discard inmemory.put(pubsubTopic, msg, digest, receiverTime)
|
||||||
|
|
||||||
info "successfully loaded messages from the persistent store"
|
info "successfully loaded messages from the persistent store"
|
||||||
|
|
||||||
@ -43,11 +43,14 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message
|
|||||||
return ok(DualMessageStore(inmemory: inmemory, persistent: persistent))
|
return ok(DualMessageStore(inmemory: inmemory, persistent: persistent))
|
||||||
|
|
||||||
|
|
||||||
method put*(s: DualMessageStore, index: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
|
||||||
?s.inmemory.put(index, message, pubsubTopic)
|
?s.inmemory.put(pubsubTopic, message, digest, receivedTime)
|
||||||
?s.persistent.put(index, message, pubsubTopic)
|
?s.persistent.put(pubsubTopic, message, digest, receivedTime)
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
method put*(s: DualMessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
||||||
|
procCall MessageStore(s).put(pubsubTopic, message)
|
||||||
|
|
||||||
|
|
||||||
method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
|
method getAllMessages*(s: DualMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
|
||||||
s.inmemory.getAllMessages()
|
s.inmemory.getAllMessages()
|
||||||
|
|||||||
@ -4,19 +4,14 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/options,
|
std/[options, times],
|
||||||
stew/results,
|
stew/results
|
||||||
chronos
|
|
||||||
import
|
import
|
||||||
../../../protocol/waku_message,
|
../../../protocol/waku_message,
|
||||||
../../../utils/time,
|
../../../utils/time,
|
||||||
../../../utils/pagination
|
../../../utils/pagination
|
||||||
|
|
||||||
|
|
||||||
# TODO: Remove this constant after moving time variance checks to waku store protocol
|
|
||||||
const StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
|
|
||||||
|
|
||||||
|
|
||||||
type
|
type
|
||||||
MessageStoreResult*[T] = Result[T, string]
|
MessageStoreResult*[T] = Result[T, string]
|
||||||
|
|
||||||
@ -28,7 +23,15 @@ type
|
|||||||
|
|
||||||
|
|
||||||
# MessageStore interface
|
# MessageStore interface
|
||||||
method put*(ms: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
|
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
|
||||||
|
|
||||||
|
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
||||||
|
let
|
||||||
|
digest = computeDigest(message)
|
||||||
|
receivedTime = getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
|
ms.put(pubsubTopic, message, digest, receivedTime)
|
||||||
|
|
||||||
|
|
||||||
method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
|
method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
|
||||||
|
|
||||||
|
|||||||
@ -3,10 +3,9 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, tables, sequtils, algorithm],
|
std/[options, tables, sequtils, algorithm, times],
|
||||||
stew/[byteutils, results],
|
stew/[byteutils, results],
|
||||||
chronicles,
|
chronicles
|
||||||
chronos
|
|
||||||
import
|
import
|
||||||
../../../../protocol/waku_message,
|
../../../../protocol/waku_message,
|
||||||
../../../../utils/pagination,
|
../../../../utils/pagination,
|
||||||
@ -62,16 +61,12 @@ proc close*(s: SqliteStore) =
|
|||||||
s.db.close()
|
s.db.close()
|
||||||
|
|
||||||
|
|
||||||
method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
|
||||||
## Inserts a message into the store
|
## Inserts a message into the store
|
||||||
|
|
||||||
# Ensure that messages don't "jump" to the front with future timestamps
|
|
||||||
if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance:
|
|
||||||
return err("future_sender_timestamp")
|
|
||||||
|
|
||||||
let res = s.insertStmt.exec((
|
let res = s.insertStmt.exec((
|
||||||
@(cursor.digest.data), # id
|
@(digest.data), # id
|
||||||
cursor.receiverTime, # receiverTimestamp
|
receivedTime, # receiverTimestamp
|
||||||
toBytes(message.contentTopic), # contentTopic
|
toBytes(message.contentTopic), # contentTopic
|
||||||
message.payload, # payload
|
message.payload, # payload
|
||||||
toBytes(pubsubTopic), # pubsubTopic
|
toBytes(pubsubTopic), # pubsubTopic
|
||||||
@ -83,6 +78,10 @@ method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: st
|
|||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
||||||
|
## Inserts a message into the store
|
||||||
|
procCall MessageStore(s).put(pubsubTopic, message)
|
||||||
|
|
||||||
|
|
||||||
method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] =
|
method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] =
|
||||||
## Retrieve all messages from the store.
|
## Retrieve all messages from the store.
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, algorithm],
|
std/[options, algorithm, times],
|
||||||
stew/[results, sorted_set],
|
stew/[results, sorted_set],
|
||||||
chronicles
|
chronicles
|
||||||
import
|
import
|
||||||
@ -313,17 +313,10 @@ proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[voi
|
|||||||
## Add a message to the queue
|
## Add a message to the queue
|
||||||
##
|
##
|
||||||
## If we're at capacity, we will be removing, the oldest (first) item
|
## If we're at capacity, we will be removing, the oldest (first) item
|
||||||
trace "adding item to store queue", msg=msg
|
|
||||||
|
|
||||||
# Ensure that messages don't "jump" to the front of the queue with future timestamps
|
|
||||||
if msg.index.senderTime - msg.index.receiverTime > StoreMaxTimeVariance:
|
|
||||||
return err("future_sender_timestamp")
|
|
||||||
|
|
||||||
if store.contains(msg.index):
|
if store.contains(msg.index):
|
||||||
trace "could not add item to store queue. Index already exists", index=msg.index
|
trace "could not add item to store queue. Index already exists", index=msg.index
|
||||||
return err("duplicate")
|
return err("duplicate")
|
||||||
|
|
||||||
|
|
||||||
# TODO: the below delete block can be removed if we convert to circular buffer
|
# TODO: the below delete block can be removed if we convert to circular buffer
|
||||||
if store.items.len >= store.capacity:
|
if store.items.len >= store.capacity:
|
||||||
var
|
var
|
||||||
@ -342,10 +335,15 @@ proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[voi
|
|||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
method put*(store: StoreQueueRef, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
|
||||||
let message = IndexedWakuMessage(msg: message, index: cursor, pubsubTopic: pubsubTopic)
|
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
|
||||||
|
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
|
||||||
|
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
|
||||||
store.add(message)
|
store.add(message)
|
||||||
|
|
||||||
|
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
||||||
|
procCall MessageStore(store).put(pubsubTopic, message)
|
||||||
|
|
||||||
|
|
||||||
proc getPage*(storeQueue: StoreQueueRef,
|
proc getPage*(storeQueue: StoreQueueRef,
|
||||||
pred: QueryFilterMatcher,
|
pred: QueryFilterMatcher,
|
||||||
|
|||||||
@ -49,6 +49,7 @@ const
|
|||||||
|
|
||||||
# Error types (metric label values)
|
# Error types (metric label values)
|
||||||
const
|
const
|
||||||
|
invalidMessage = "invalid_message"
|
||||||
insertFailure = "insert_failure"
|
insertFailure = "insert_failure"
|
||||||
retPolicyFailure = "retpolicy_failure"
|
retPolicyFailure = "retpolicy_failure"
|
||||||
dialFailure = "dial_failure"
|
dialFailure = "dial_failure"
|
||||||
@ -204,6 +205,17 @@ proc init*(T: type WakuStore,
|
|||||||
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)
|
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)
|
||||||
|
|
||||||
|
|
||||||
|
proc isValidMessage(msg: WakuMessage): bool =
|
||||||
|
if msg.timestamp == 0:
|
||||||
|
return true
|
||||||
|
|
||||||
|
let
|
||||||
|
now = getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
lowerBound = now - MaxMessageTimestampVariance
|
||||||
|
upperBound = now + MaxMessageTimestampVariance
|
||||||
|
|
||||||
|
return lowerBound <= msg.timestamp and msg.timestamp <= upperBound
|
||||||
|
|
||||||
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
|
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
|
||||||
if w.store.isNil():
|
if w.store.isNil():
|
||||||
# Messages should not be stored
|
# Messages should not be stored
|
||||||
@ -213,20 +225,27 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
|
|||||||
# The message is ephemeral, should not be stored
|
# The message is ephemeral, should not be stored
|
||||||
return
|
return
|
||||||
|
|
||||||
let insertStartTime = getTime().toUnixFloat()
|
if not isValidMessage(msg):
|
||||||
|
waku_store_errors.inc(labelValues = [invalidMessage])
|
||||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
|
||||||
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
|
|
||||||
|
|
||||||
trace "handling message", topic=pubsubTopic, index=index
|
|
||||||
|
|
||||||
# Add messages to persistent store, if present
|
|
||||||
let putStoreRes = w.store.put(index, msg, pubsubTopic)
|
|
||||||
if putStoreRes.isErr():
|
|
||||||
debug "failed to insert message to persistent store", index=index, err=putStoreRes.error
|
|
||||||
waku_store_errors.inc(labelValues = [insertFailure])
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
let insertStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
|
block:
|
||||||
|
let
|
||||||
|
msgDigest = computeDigest(msg)
|
||||||
|
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
|
||||||
|
else: getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
|
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
|
||||||
|
|
||||||
|
let putStoreRes = w.store.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
|
||||||
|
if putStoreRes.isErr():
|
||||||
|
debug "failed to insert message into the store", err=putStoreRes.error
|
||||||
|
waku_store_errors.inc(labelValues = [insertFailure])
|
||||||
|
return
|
||||||
|
|
||||||
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||||
waku_store_insert_duration_seconds.observe(insertDuration)
|
waku_store_insert_duration_seconds.observe(insertDuration)
|
||||||
|
|
||||||
@ -402,10 +421,7 @@ proc resume*(w: WakuStore,
|
|||||||
# Save the retrieved messages in the store
|
# Save the retrieved messages in the store
|
||||||
var added: uint = 0
|
var added: uint = 0
|
||||||
for msg in res.get():
|
for msg in res.get():
|
||||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
let putStoreRes = w.store.put(pubsubTopic, msg)
|
||||||
let index = Index.compute(msg, receivedTime=now, pubsubTopic=pubsubTopic)
|
|
||||||
|
|
||||||
let putStoreRes = w.store.put(index, msg, pubsubTopic)
|
|
||||||
if putStoreRes.isErr():
|
if putStoreRes.isErr():
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import
|
|||||||
../protocol/waku_message,
|
../protocol/waku_message,
|
||||||
./time
|
./time
|
||||||
|
|
||||||
|
type MessageDigest* = MDigest[256]
|
||||||
|
|
||||||
const
|
const
|
||||||
MaxPageSize*: uint64 = 100
|
MaxPageSize*: uint64 = 100
|
||||||
@ -20,9 +21,9 @@ type Index* = object
|
|||||||
pubsubTopic*: string
|
pubsubTopic*: string
|
||||||
senderTime*: Timestamp # the time at which the message is generated
|
senderTime*: Timestamp # the time at which the message is generated
|
||||||
receiverTime*: Timestamp
|
receiverTime*: Timestamp
|
||||||
digest*: MDigest[256] # calculated over payload and content topic
|
digest*: MessageDigest # calculated over payload and content topic
|
||||||
|
|
||||||
proc computeDigest*(msg: WakuMessage): MDigest[256] =
|
proc computeDigest*(msg: WakuMessage): MessageDigest =
|
||||||
var ctx: sha256
|
var ctx: sha256
|
||||||
ctx.init()
|
ctx.init()
|
||||||
defer: ctx.clear()
|
defer: ctx.clear()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user