Store data model updates and fixes (#864)

This commit is contained in:
Hanno Cornelius 2022-02-28 17:29:01 +01:00 committed by GitHub
parent 9a5054a238
commit 20efdd7255
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 119 additions and 26 deletions

View File

@ -45,7 +45,26 @@ procSuite "Pagination utils":
eqIndex3 = Index(digest: hashFromStr("0003"), eqIndex3 = Index(digest: hashFromStr("0003"),
receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: getNanosecondTime(54321)) senderTime: getNanosecondTime(54321))
diffPsTopic = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000),
pubsubTopic: "zzzz")
noSenderTime1 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(1100),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime2 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(10000),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime3 = Index(digest: hashFromStr("1234"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "aaaa")
noSenderTime4 = Index(digest: hashFromStr("0"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
## Test suite ## Test suite
asyncTest "Index comparison": asyncTest "Index comparison":
@ -63,16 +82,37 @@ procSuite "Pagination utils":
# Index comparison when equal # Index comparison when equal
cmp(eqIndex1, eqIndex2) == 0 cmp(eqIndex1, eqIndex2) == 0
# receiverTime difference play no role # pubsubTopic difference
cmp(smallIndex1, diffPsTopic) < 0
# receiverTime diff plays no role when senderTime set
cmp(eqIndex1, eqIndex3) == 0 cmp(eqIndex1, eqIndex3) == 0
# receiverTime diff plays no role when digest/pubsubTopic equal
cmp(noSenderTime1, noSenderTime2) == 0
# sort on receiverTime with no senderTimestamp and unequal pubsubTopic
cmp(noSenderTime1, noSenderTime3) < 0
# sort on receiverTime with no senderTimestamp and unequal digest
cmp(noSenderTime1, noSenderTime4) < 0
# sort on receiverTime if no senderTimestamp on only one side
cmp(smallIndex1, noSenderTime1) < 0
cmp(noSenderTime1, smallIndex1) > 0 # Test symmetry
cmp(noSenderTime2, eqIndex3) < 0
cmp(eqIndex3, noSenderTime2) > 0 # Test symmetry
asyncTest "Index equality": asyncTest "Index equality":
check: check:
# Exactly equal # Exactly equal
eqIndex1 == eqIndex2 eqIndex1 == eqIndex2
# Receiver time plays no role # Receiver time plays no role, even without sender time
eqIndex1 == eqIndex3 eqIndex1 == eqIndex3
noSenderTime1 == noSenderTime2 # only receiver time differs, indices are equal
noSenderTime1 != noSenderTime3 # pubsubTopics differ
noSenderTime1 != noSenderTime4 # digests differ
# Unequal sender time # Unequal sender time
smallIndex1 != largeIndex1 smallIndex1 != largeIndex1
@ -82,3 +122,6 @@ procSuite "Pagination utils":
# Unequal hash and digest # Unequal hash and digest
smallIndex1 != eqIndex1 smallIndex1 != eqIndex1
# Unequal pubsubTopic
smallIndex1 != diffPsTopic

View File

@ -34,7 +34,8 @@ procSuite "pagination":
len(index.digest.data) != 0 len(index.digest.data) != 0
len(index.digest.data) == 32 # sha2 output length in bytes len(index.digest.data) == 32 # sha2 output length in bytes
index.receiverTime != 0 # the receiver timestamp should be a non-zero value index.receiverTime != 0 # the receiver timestamp should be a non-zero value
index.senderTime == 2 index.senderTime == 2
index.pubsubTopic == DefaultTopic
let let
wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto")) wm1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("/waku/2/default-content/proto"))

View File

@ -31,13 +31,14 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T]
let prepare = db.prepareStmt(""" let prepare = db.prepareStmt("""
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ ( CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
id BLOB PRIMARY KEY, id BLOB,
receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL, receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
contentTopic BLOB NOT NULL, contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL, pubsubTopic BLOB NOT NULL,
payload BLOB, payload BLOB,
version INTEGER NOT NULL, version INTEGER NOT NULL,
senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)
) WITHOUT ROWID; ) WITHOUT ROWID;
""", NoParams, void) """, NoParams, void)
@ -133,5 +134,3 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non
proc close*(db: WakuMessageStore) = proc close*(db: WakuMessageStore) =
## Closes the database. ## Closes the database.
db.database.close() db.database.close()

View File

@ -7,7 +7,7 @@ const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message"
const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer" const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer"
const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts" const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts"
const USER_VERSION* = 2 # increase this when there is an update in the database schema const USER_VERSION* = 4 # increase this when there is an update in the database schema
type MigrationScriptsResult*[T] = Result[T, string] type MigrationScriptsResult*[T] = Result[T, string]
type type

View File

@ -22,9 +22,8 @@ CREATE TABLE IF NOT EXISTS Message(
senderTimestamp INTEGER NOT NULL senderTimestamp INTEGER NOT NULL
) WITHOUT ROWID; ) WITHOUT ROWID;
INSERT INTO Message (id, receiverTimestamp, contentTopic, pubsubTopic, payload, version, senderTimestamp) INSERT INTO Message (id, receiverTimestamp, contentTopic, pubsubTopic, payload, version, senderTimestamp)
SELECT id, FLOOR(receiverTimestamp*1000000000), contentTopic, pubsubTopic, payload, version, FLOOR(senderTimestamp*1000000000) SELECT id, CAST(receiverTimestamp*1000000000 AS INTEGER), contentTopic, pubsubTopic, payload, version, CAST(senderTimestamp*1000000000 AS INTEGER)
FROM Message_backup; FROM Message_backup;
DROP TABLE Message_backup; DROP TABLE Message_backup;

View File

@ -0,0 +1,18 @@
ALTER TABLE Message RENAME TO Message_backup;
CREATE TABLE IF NOT EXISTS Message(
id BLOB,
receiverTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)
) WITHOUT ROWID;
INSERT INTO Message
SELECT *
FROM Message_backup;
DROP TABLE Message_backup;

View File

@ -56,7 +56,9 @@ const
# TODO Move serialization function to separate file, too noisy # TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic # TODO Move pagination to separate file, self-contained logic
proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime().toUnixFloat())): Index = proc computeIndex*(msg: WakuMessage,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = DefaultTopic): Index =
## Takes a WakuMessage with received timestamp and returns its Index. ## Takes a WakuMessage with received timestamp and returns its Index.
## Received timestamp will default to system time if not provided. ## Received timestamp will default to system time if not provided.
var ctx: sha256 var ctx: sha256
@ -66,8 +68,13 @@ proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime().
let digest = ctx.finish() # computes the hash let digest = ctx.finish() # computes the hash
ctx.clear() ctx.clear()
let receiverTime = receivedTime let
var index = Index(digest:digest, receiverTime: receiverTime, senderTime: msg.timestamp) receiverTime = receivedTime
index = Index(digest:digest,
receiverTime: receiverTime,
senderTime: msg.timestamp,
pubsubTopic: pubsubTopic)
return index return index
proc encode*(index: Index): ProtoBuffer = proc encode*(index: Index): ProtoBuffer =
@ -81,6 +88,7 @@ proc encode*(index: Index): ProtoBuffer =
output.write(1, index.digest.data) output.write(1, index.digest.data)
output.write(2, zint64(index.receiverTime)) output.write(2, zint64(index.receiverTime))
output.write(3, zint64(index.senderTime)) output.write(3, zint64(index.senderTime))
output.write(4, index.pubsubTopic)
return output return output
@ -121,6 +129,9 @@ proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getField(3, senderTime) discard ? pb.getField(3, senderTime)
index.senderTime = Timestamp(senderTime) index.senderTime = Timestamp(senderTime)
# read the pubsubTopic
discard ? pb.getField(4, index.pubsubTopic)
return ok(index) return ok(index)
proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
@ -177,7 +188,6 @@ proc init*(T: type HistoryQuery, buffer: seq[byte]): ProtoResult[T] =
discard ? pb.getField(6, endTime) discard ? pb.getField(6, endTime)
msg.endTime = Timestamp(endTime) msg.endTime = Timestamp(endTime)
return ok(msg) return ok(msg)
proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type HistoryResponse, buffer: seq[byte]): ProtoResult[T] =
@ -344,7 +354,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) = proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated # TODO index should not be recalculated
discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic)) discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime, pubsubTopic), pubsubTopic: pubsubTopic))
info "attempting to load messages from persistent storage" info "attempting to load messages from persistent storage"
@ -380,12 +390,13 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
# Handle WakuMessage according to store protocol # Handle WakuMessage according to store protocol
trace "handle message in WakuStore", topic=topic, msg=msg trace "handle message in WakuStore", topic=topic, msg=msg
let index = msg.computeIndex() let index = msg.computeIndex(pubsubTopic = topic)
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr: if addRes.isErr:
trace "Attempt to add message with duplicate index to store", msg=msg, index=index trace "Attempt to add message with duplicate index to store", msg=msg, index=index
waku_store_errors.inc(labelValues = ["duplicate"]) waku_store_errors.inc(labelValues = ["duplicate"])
return # Do not attempt to store in persistent DB
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"]) waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
@ -557,7 +568,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
# exclude index from the comparison criteria # exclude index from the comparison criteria
for msg in msgList: for msg in msgList:
let index = msg.computeIndex() let index = msg.computeIndex(pubsubTopic = DefaultTopic)
# check for duplicate messages # check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if ws.messages.contains(index): if ws.messages.contains(index):

View File

@ -14,26 +14,48 @@ export hash
type type
Index* = object Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages ## This type contains the description of an Index used in the pagination of WakuMessages
digest*: MDigest[256] digest*: MDigest[256] # calculated over payload and content topic
receiverTime*: Timestamp receiverTime*: Timestamp
senderTime*: Timestamp # the time at which the message is generated senderTime*: Timestamp # the time at which the message is generated
pubsubTopic*: string
proc `==`*(x, y: Index): bool = proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index comparison ## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and (x.digest == y.digest) (x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
proc cmp*(x, y: Index): int = proc cmp*(x, y: Index): int =
## compares x and y ## compares x and y
## returns 0 if they are equal ## returns 0 if they are equal
## returns -1 if x < y ## returns -1 if x < y
## returns 1 if x > y ## returns 1 if x > y
## receiverTime plays no role in index comparison ##
## Default sorting order priority is:
## 1. senderTimestamp
## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal)
## 3. message digest
## 4. pubsubTopic
if x == y:
# Quick exit ensures receiver time does not affect index equality
return 0
# Timestamp has a higher priority for comparison # Timestamp has a higher priority for comparison
let timecmp = cmp(x.senderTime, y.senderTime) let
# Use receiverTime where senderTime is unset
xTimestamp = if x.senderTime == 0: x.receiverTime
else: x.senderTime
yTimestamp = if y.senderTime == 0: y.receiverTime
else: y.senderTime
let timecmp = cmp(xTimestamp, yTimestamp)
if timecmp != 0: if timecmp != 0:
return timecmp return timecmp
# Only when timestamps are equal # Continue only when timestamps are equal
let digestcm = cmp(x.digest.data, y.digest.data) let digestcmp = cmp(x.digest.data, y.digest.data)
return digestcm if digestcmp != 0:
return digestcmp
return cmp(x.pubsubTopic, y.pubsubTopic)