mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-12 07:44:57 +00:00
feat: reviews updated
This commit is contained in:
parent
1b0e0655b0
commit
ff8622f877
@ -23,7 +23,7 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
|
||||
cursor = Index(
|
||||
receiverTime: Timestamp(i),
|
||||
senderTime: Timestamp(i),
|
||||
messageHash: MessageDigest(data: data),
|
||||
messageHash: MessageHash(data: data),
|
||||
pubsubTopic: "test-pubsub-topic"
|
||||
)
|
||||
|
||||
|
@ -25,7 +25,7 @@ proc getTestQueueDriver(numMessages: int): QueueDriver =
|
||||
index: Index(
|
||||
receiverTime: Timestamp(i),
|
||||
senderTime: Timestamp(i),
|
||||
messageHash: MessageDigest(data: data)
|
||||
messageHash: MessageHash(data: data)
|
||||
)
|
||||
)
|
||||
discard testQueueDriver.add(msg)
|
||||
|
@ -11,7 +11,7 @@ import
|
||||
../../waku/node/message_store/sqlite_store,
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_store
|
||||
../../waku/waku_store,
|
||||
./testlib/common,
|
||||
./testlib/switch
|
||||
|
||||
|
@ -53,19 +53,19 @@ proc testWakuNode(): WakuNode =
|
||||
################################################################################
|
||||
procSuite "Waku v2 Rest API - Store":
|
||||
|
||||
asyncTest "MessageDigest <-> string conversions":
|
||||
# Validate MessageDigest conversion from a WakuMessage obj
|
||||
asyncTest "MessageHash <-> string conversions":
|
||||
# Validate MessageHash conversion from a WakuMessage obj
|
||||
let wakuMsg = WakuMessage(
|
||||
contentTopic: "Test content topic",
|
||||
payload: @[byte('H'), byte('i'), byte('!')]
|
||||
)
|
||||
|
||||
let messageDigest = waku_store.computeDigest(wakuMsg, DefaultPubsubTopic)
|
||||
let restMsgDigest = some(messageDigest.toRestStringMessageDigest())
|
||||
let messageHash = waku_store.computeDigest(wakuMsg, DefaultPubsubTopic)
|
||||
let restMsgDigest = some(messageHash.toRestStringMessageDigest())
|
||||
let parsedMsgDigest = restMsgDigest.parseMsgDigest().value
|
||||
|
||||
check:
|
||||
messageDigest == parsedMsgDigest.get()
|
||||
messageHash == parsedMsgDigest.get()
|
||||
|
||||
# Random validation. Obtained the raw values manually
|
||||
let expected = some("ZjNhM2Q2NDkwMTE0MjMzNDg0MzJlMDdiZGI3NzIwYTc%3D")
|
||||
@ -198,7 +198,7 @@ procSuite "Waku v2 Rest API - Store":
|
||||
var reqPubsubTopic = DefaultPubsubTopic
|
||||
var reqSenderTime = Timestamp(0)
|
||||
var reqStoreTime = Timestamp(0)
|
||||
var reqDigest = waku_store.MessageDigest()
|
||||
var reqHash = waku_store.MessageHash()
|
||||
|
||||
for i in 0..<2:
|
||||
let response =
|
||||
@ -210,7 +210,7 @@ procSuite "Waku v2 Rest API - Store":
|
||||
"", # end time. Empty ignores the field.
|
||||
encodeUrl($reqSenderTime), # sender time
|
||||
encodeUrl($reqStoreTime), # store time
|
||||
reqDigest.toRestStringMessageDigest(), # base64-encoded digest. Empty ignores the field.
|
||||
reqHash.toRestStringMessageDigest(), # base64-encoded hash. Empty ignores the field.
|
||||
"7", # page size. Empty implies default page size.
|
||||
"true" # ascending
|
||||
)
|
||||
@ -224,7 +224,7 @@ procSuite "Waku v2 Rest API - Store":
|
||||
# populate the cursor for next page
|
||||
if response.data.cursor.isSome():
|
||||
reqPubsubTopic = response.data.cursor.get().pubsubTopic
|
||||
reqDigest = response.data.cursor.get().messageHash
|
||||
reqHash = response.data.cursor.get().messageHash
|
||||
reqSenderTime = response.data.cursor.get().senderTime
|
||||
reqStoreTime = response.data.cursor.get().storeTime
|
||||
|
||||
|
@ -60,7 +60,7 @@ proc getStoreMessagesV1*(
|
||||
# Optional cursor fields
|
||||
senderTime: string = "",
|
||||
storeTime: string = "",
|
||||
messageHash: string = "", # base64-encoded digest
|
||||
messageHash: string = "", # base64-encoded hash
|
||||
|
||||
pageSize: string = "",
|
||||
ascending: string = ""
|
||||
|
@ -89,23 +89,23 @@ proc parseCursor(parsedPubsubTopic: Option[string],
|
||||
if not parsedStoreTime.isOk():
|
||||
return err(parsedStoreTime.error)
|
||||
|
||||
# Parse message digest
|
||||
let parsedMsgDigest = parseMsgDigest(messageHash)
|
||||
if not parsedMsgDigest.isOk():
|
||||
return err(parsedMsgDigest.error)
|
||||
# Parse message hash
|
||||
let parsedMsgHash = parseMsgDigest(messageHash)
|
||||
if not parsedMsgHash.isOk():
|
||||
return err(parsedMsgHash.error)
|
||||
|
||||
# Parse cursor information
|
||||
if parsedPubsubTopic.isSome() and
|
||||
parsedSenderTime.value.isSome() and
|
||||
parsedStoreTime.value.isSome() and
|
||||
parsedMsgDigest.value.isSome():
|
||||
parsedMsgHash.value.isSome():
|
||||
|
||||
return ok(some(
|
||||
HistoryCursor(
|
||||
pubsubTopic: parsedPubsubTopic.get(),
|
||||
senderTime: parsedSenderTime.value.get(),
|
||||
storeTime: parsedStoreTime.value.get(),
|
||||
messageHash: parsedMsgDigest.value.get())
|
||||
messageHash: parsedMsgHash.value.get())
|
||||
))
|
||||
else:
|
||||
return ok(none(HistoryCursor))
|
||||
|
@ -24,7 +24,7 @@ type
|
||||
pubsubTopic*: PubsubTopic
|
||||
senderTime*: Timestamp
|
||||
storeTime*: Timestamp
|
||||
messageHash*: MessageDigest
|
||||
messageHash*: MessageHash
|
||||
|
||||
StoreRequestRest* = object
|
||||
# inspired by https://github.com/waku-org/nwaku/blob/f95147f5b7edfd45f914586f2d41cd18fb0e0d18/waku/v2//waku_store/common.nim#L52
|
||||
@ -53,37 +53,37 @@ type
|
||||
|
||||
#### Type conversion
|
||||
|
||||
# Converts a URL-encoded-base64 string into a 'MessageDigest'
|
||||
# Converts a URL-encoded-base64 string into a 'MessageHash'
|
||||
proc parseMsgDigest*(input: Option[string]):
|
||||
Result[Option[MessageDigest], string] =
|
||||
Result[Option[MessageHash], string] =
|
||||
|
||||
if not input.isSome() or input.get() == "":
|
||||
return ok(none(MessageDigest))
|
||||
return ok(none(MessageHash))
|
||||
|
||||
let decodedUrl = decodeUrl(input.get())
|
||||
let base64Decoded = base64.decode(Base64String(decodedUrl))
|
||||
var messageDigest = MessageDigest()
|
||||
var messageHash = MessageHash()
|
||||
|
||||
if not base64Decoded.isOk():
|
||||
return err(base64Decoded.error)
|
||||
|
||||
let base64DecodedArr = base64Decoded.get()
|
||||
# Next snippet inspired by "nwaku/waku/waku_archive/archive.nim"
|
||||
# TODO: Improve coherence of MessageDigest type
|
||||
messageDigest = block:
|
||||
# TODO: Improve coherence of MessageHash type
|
||||
messageHash = block:
|
||||
var data: array[32, byte]
|
||||
for i in 0..<min(base64DecodedArr.len, 32):
|
||||
data[i] = base64DecodedArr[i]
|
||||
|
||||
MessageDigest(data: data)
|
||||
MessageHash(data: data)
|
||||
|
||||
return ok(some(messageDigest))
|
||||
return ok(some(messageHash))
|
||||
|
||||
# Converts a given MessageDigest object into a suitable
|
||||
# Converts a given MessageHash object into a suitable
|
||||
# Base64-URL-encoded string suitable to be transmitted in a Rest
|
||||
# request-response. The MessageDigest is first base64 encoded
|
||||
# request-response. The MessageHash is first base64 encoded
|
||||
# and this result is URL-encoded.
|
||||
proc toRestStringMessageDigest*(self: MessageDigest): string =
|
||||
proc toRestStringMessageDigest*(self: MessageHash): string =
|
||||
let base64Encoded = base64.encode(self.data)
|
||||
encodeUrl($base64Encoded)
|
||||
|
||||
@ -203,17 +203,17 @@ proc readValue*(reader: var JsonReader[RestJson],
|
||||
|
||||
## End of StoreWakuMessage serde
|
||||
|
||||
## Beginning of MessageDigest serde
|
||||
## Beginning of MessageHash serde
|
||||
|
||||
proc writeValue*(writer: var JsonWriter[RestJson],
|
||||
value: MessageDigest)
|
||||
value: MessageHash)
|
||||
{.raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
writer.writeField("data", base64.encode(value.data))
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(reader: var JsonReader[RestJson],
|
||||
value: var MessageDigest)
|
||||
value: var MessageHash)
|
||||
{.raises: [SerializationError, IOError].} =
|
||||
var
|
||||
data = none(seq[byte])
|
||||
@ -222,10 +222,10 @@ proc readValue*(reader: var JsonReader[RestJson],
|
||||
case fieldName
|
||||
of "data":
|
||||
if data.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `data` fields found", "MessageDigest")
|
||||
reader.raiseUnexpectedField("Multiple `data` fields found", "MessageHash")
|
||||
let decoded = base64.decode(reader.readValue(Base64String))
|
||||
if not decoded.isOk():
|
||||
reader.raiseUnexpectedField("Failed decoding data", "MessageDigest")
|
||||
reader.raiseUnexpectedField("Failed decoding data", "MessageHash")
|
||||
data = some(decoded.get())
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
@ -236,7 +236,7 @@ proc readValue*(reader: var JsonReader[RestJson],
|
||||
for i in 0..<32:
|
||||
value.data[i] = data.get()[i]
|
||||
|
||||
## End of MessageDigest serde
|
||||
## End of MessageHash serde
|
||||
|
||||
## Beginning of HistoryCursorRest serde
|
||||
|
||||
@ -257,7 +257,7 @@ proc readValue*(reader: var JsonReader[RestJson],
|
||||
pubsubTopic = none(PubsubTopic)
|
||||
senderTime = none(Timestamp)
|
||||
storeTime = none(Timestamp)
|
||||
messageHash = none(MessageDigest)
|
||||
messageHash = none(MessageHash)
|
||||
|
||||
for fieldName in readObjectFields(reader):
|
||||
case fieldName
|
||||
@ -276,7 +276,7 @@ proc readValue*(reader: var JsonReader[RestJson],
|
||||
of "messageHash":
|
||||
if messageHash.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `messageHash` fields found", "HistoryCursorRest")
|
||||
messageHash = some(reader.readValue(MessageDigest))
|
||||
messageHash = some(reader.readValue(MessageHash))
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
|
||||
|
@ -165,19 +165,19 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
|
||||
## (i.e. the second last message in the rows list)
|
||||
let (pubsubTopic, message, messageHash, storeTimestamp) = rows[^2]
|
||||
|
||||
# TODO: Improve coherence of MessageDigest type
|
||||
let messageDigest = block:
|
||||
# TODO: Improve coherence of MessageHash type
|
||||
let msgHash = block:
|
||||
var data: array[32, byte]
|
||||
for i in 0..<min(messageHash.len, 32):
|
||||
data[i] = messageHash[i]
|
||||
|
||||
MessageDigest(data: data)
|
||||
MessageHash(data: data)
|
||||
|
||||
cursor = some(ArchiveCursor(
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: storeTimestamp,
|
||||
messageHash: messageDigest
|
||||
messageHash: msgHash
|
||||
))
|
||||
|
||||
# All messages MUST be returned in chronological order
|
||||
|
@ -16,9 +16,9 @@ import
|
||||
# TODO: Move this into the driver implementations. We should be passing
|
||||
# here a buffer containing a serialized implementation dependent cursor.
|
||||
|
||||
type MessageDigest* = MDigest[256]
|
||||
type MessageHash* = MDigest[256]
|
||||
|
||||
proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageDigest =
|
||||
proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageHash =
|
||||
var ctx: sha256
|
||||
ctx.init()
|
||||
defer: ctx.clear()
|
||||
@ -38,7 +38,7 @@ type DbCursor = object
|
||||
pubsubTopic*: PubsubTopic
|
||||
senderTime*: Timestamp
|
||||
storeTime*: Timestamp
|
||||
messageHash*: MessageDigest
|
||||
messageHash*: MessageHash
|
||||
|
||||
|
||||
## Public API types
|
||||
|
@ -25,7 +25,7 @@ type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)
|
||||
method put*(driver: ArchiveDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
digest: MessageDigest,
|
||||
digest: MessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.base, async.} = discard
|
||||
|
||||
|
@ -93,7 +93,7 @@ proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
method put*(s: PostgresDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
messageHash: MessageDigest,
|
||||
messageHash: MessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
|
@ -16,7 +16,7 @@ type Index* = object
|
||||
pubsubTopic*: string
|
||||
senderTime*: Timestamp # the time at which the message is generated
|
||||
receiverTime*: Timestamp
|
||||
messageHash*: MessageDigest # calculated over payload and content topic
|
||||
messageHash*: MessageHash # calculated over payload and content topic
|
||||
|
||||
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
|
||||
## Takes a WakuMessage with received timestamp and returns its Index.
|
||||
|
@ -227,7 +227,7 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi
|
||||
method put*(driver: QueueDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
messageHash: MessageDigest,
|
||||
messageHash: MessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, messageHash: messageHash)
|
||||
|
@ -19,7 +19,7 @@ const SchemaVersion* = 8 # increase this when there is an update in the database
|
||||
template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".."
|
||||
const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store"
|
||||
|
||||
proc isSchemaVersion8*(db: SqliteDatabase): DatabaseResult[bool] =
|
||||
proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] =
|
||||
## Temporary proc created to analyse when the table actually belongs to the SchemaVersion 7.
|
||||
##
|
||||
## During many nwaku versions, 0.14.0 until 0.18.0, the SchemaVersion wasn't set or checked.
|
||||
@ -63,12 +63,12 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult
|
||||
debug "starting message store's sqlite database migration"
|
||||
|
||||
let userVersion = ? db.getUserVersion()
|
||||
let isSchemaVersion8 = ? db.isSchemaVersion8()
|
||||
let isSchemaVersion7 = ? db.isSchemaVersion7()
|
||||
|
||||
if userVersion == 0'i64 and isSchemaVersion8:
|
||||
if userVersion == 0'i64 and isSchemaVersion7:
|
||||
info "We found user_version 0 but the database schema reflects the user_version 7"
|
||||
## Force the correct schema version
|
||||
? db.setUserVersion( 8 )
|
||||
? db.setUserVersion( 7 )
|
||||
|
||||
let migrationRes = migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath)
|
||||
if migrationRes.isErr():
|
||||
|
@ -60,12 +60,12 @@ proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] =
|
||||
method put*(s: SqliteDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
digest: MessageDigest,
|
||||
messageHash: MessageHash,
|
||||
receivedTime: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Inserts a message into the store
|
||||
let res = s.insertStmt.exec((
|
||||
@(digest.data), # messageHash
|
||||
@(messageHash.data), # messageHash
|
||||
receivedTime, # storedAt
|
||||
toBytes(message.contentTopic), # contentTopic
|
||||
message.payload, # payload
|
||||
|
@ -23,11 +23,11 @@ const
|
||||
type WakuStoreResult*[T] = Result[T, string]
|
||||
|
||||
|
||||
## Waku message digest
|
||||
## Waku message hash
|
||||
|
||||
type MessageDigest* = MDigest[256]
|
||||
type MessageHash* = MDigest[256]
|
||||
|
||||
proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageDigest =
|
||||
proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageHash =
|
||||
var ctx: sha256
|
||||
ctx.init()
|
||||
defer: ctx.clear()
|
||||
@ -48,7 +48,7 @@ type
|
||||
pubsubTopic*: PubsubTopic
|
||||
senderTime*: Timestamp
|
||||
storeTime*: Timestamp
|
||||
messageHash*: MessageDigest
|
||||
messageHash*: MessageHash
|
||||
|
||||
HistoryQuery* = object
|
||||
pubsubTopic*: Option[PubsubTopic]
|
||||
|
@ -18,7 +18,7 @@ type PagingIndexRPC* = object
|
||||
pubsubTopic*: PubsubTopic
|
||||
senderTime*: Timestamp # the time at which the message is generated
|
||||
receiverTime*: Timestamp
|
||||
messageHash*: MessageDigest # calculated over payload and content topic
|
||||
messageHash*: MessageHash # calculated over payload and content topic
|
||||
|
||||
proc `==`*(x, y: PagingIndexRPC): bool =
|
||||
## receiverTime plays no role in index equality
|
||||
@ -29,14 +29,14 @@ proc `==`*(x, y: PagingIndexRPC): bool =
|
||||
proc compute*(T: type PagingIndexRPC, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
|
||||
## Takes a WakuMessage with received timestamp and returns its Index.
|
||||
let
|
||||
digest = computeDigest(msg, pubsubTopic)
|
||||
msgHash = computeDigest(msg, pubsubTopic)
|
||||
senderTime = msg.timestamp
|
||||
|
||||
PagingIndexRPC(
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: senderTime,
|
||||
receiverTime: receivedTime,
|
||||
messageHash: digest
|
||||
messageHash: msgHash
|
||||
)
|
||||
|
||||
|
||||
|
@ -40,7 +40,7 @@ proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtobufResult[T] =
|
||||
if not ?pb.getField(1, data):
|
||||
return err(ProtobufError.missingRequiredField("messageHash"))
|
||||
else:
|
||||
var messageHash = MessageDigest()
|
||||
var messageHash = MessageHash()
|
||||
for count, b in data:
|
||||
messageHash.data[count] = b
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user