mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-22 11:58:36 +00:00
chore: sending msg hash as string on libwaku message event (#3234)
This commit is contained in:
parent
aaf3c00706
commit
9c209b4c3b
@ -59,7 +59,7 @@ proc `%`*(value: Base64String): JsonNode =
|
||||
|
||||
type JsonMessageEvent* = ref object of JsonEvent
|
||||
pubsubTopic*: string
|
||||
messageHash*: WakuMessageHash
|
||||
messageHash*: string
|
||||
wakuMessage*: JsonMessage
|
||||
|
||||
proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T =
|
||||
@ -83,7 +83,7 @@ proc new*(T: type JsonMessageEvent, pubSubTopic: string, msg: WakuMessage): T =
|
||||
return JsonMessageEvent(
|
||||
eventType: "message",
|
||||
pubSubTopic: pubSubTopic,
|
||||
messageHash: msgHash,
|
||||
messageHash: msgHash.to0xHex(),
|
||||
wakuMessage: JsonMessage(
|
||||
payload: base64.encode(payload),
|
||||
contentTopic: msg.contentTopic,
|
||||
|
@ -1,5 +1,5 @@
|
||||
import std/[json, sugar, strutils, options]
|
||||
import chronos, chronicles, results
|
||||
import chronos, chronicles, results, stew/byteutils
|
||||
import
|
||||
../../../../../waku/factory/waku,
|
||||
../../../../alloc,
|
||||
@ -27,16 +27,12 @@ func fromJsonNode(
|
||||
for cTopic in jsonContent["content_topics"].getElems():
|
||||
cTopic.getStr()
|
||||
|
||||
let msgHashes = collect(newSeq):
|
||||
if jsonContent.contains("message_hashes"):
|
||||
for hashJsonObj in jsonContent["message_hashes"].getElems():
|
||||
var hash: WakuMessageHash
|
||||
var count: int = 0
|
||||
for byteValue in hashJsonObj.getElems():
|
||||
hash[count] = byteValue.getInt().byte
|
||||
count.inc()
|
||||
|
||||
hash
|
||||
var msgHashes: seq[WakuMessageHash]
|
||||
if jsonContent.contains("message_hashes"):
|
||||
for hashJsonObj in jsonContent["message_hashes"].getElems():
|
||||
let hash = hashJsonObj.getStr().hexToHash().valueOr:
|
||||
return err("Failed converting message hash hex string to bytes: " & error)
|
||||
msgHashes.add(hash)
|
||||
|
||||
let pubsubTopic =
|
||||
if jsonContent.contains("pubsub_topic"):
|
||||
@ -46,12 +42,9 @@ func fromJsonNode(
|
||||
|
||||
let paginationCursor =
|
||||
if jsonContent.contains("pagination_cursor"):
|
||||
var hash: WakuMessageHash
|
||||
var count: int = 0
|
||||
for byteValue in jsonContent["pagination_cursor"].getElems():
|
||||
hash[count] = byteValue.getInt().byte
|
||||
count.inc()
|
||||
|
||||
let hash = jsonContent["pagination_cursor"].getStr().hexToHash().valueOr:
|
||||
return
|
||||
err("Failed converting pagination_cursor hex string to bytes: " & error)
|
||||
some(hash)
|
||||
else:
|
||||
none(WakuMessageHash)
|
||||
@ -120,7 +113,8 @@ proc process_remote_query(
|
||||
let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr:
|
||||
return err("StoreRequest failed store query: " & $error)
|
||||
|
||||
return ok($(%*queryResponse)) ## returning the response in json format
|
||||
let res = $(%*(queryResponse.toHex()))
|
||||
return ok(res) ## returning the response in json format
|
||||
|
||||
proc process*(
|
||||
self: ptr StoreRequest, waku: ptr Waku
|
||||
|
@ -74,7 +74,8 @@ procSuite "Waku Rest API - Store v3":
|
||||
messageHash == parsedMsgHashRes.get().get()
|
||||
|
||||
# Random validation. Obtained the raw values manually
|
||||
let expected = some("f6za9OzG1xSiEZagZc2b3litRbkd3zRl61rezDd3pgQ%3D")
|
||||
let expected =
|
||||
some("0x9e0ea917677a3d2b8610b0126986d89824b6acf76008b5fb9aa8b99ac906c1a7")
|
||||
|
||||
let msgHashRes = parseHash(expected)
|
||||
assert msgHashRes.isOk(), $msgHashRes.error
|
||||
@ -147,7 +148,7 @@ procSuite "Waku Rest API - Store v3":
|
||||
"", # start time
|
||||
"", # end time
|
||||
"", # hashes
|
||||
encodedCursor, # base64-encoded hash
|
||||
encodedCursor, # hex-encoded hash
|
||||
"true", # ascending
|
||||
"5", # empty implies default page size
|
||||
)
|
||||
@ -217,7 +218,7 @@ procSuite "Waku Rest API - Store v3":
|
||||
"3", # start time
|
||||
"6", # end time
|
||||
"", # hashes
|
||||
"", # base64-encoded hash
|
||||
"", # hex-encoded hash
|
||||
"true", # ascending
|
||||
"", # empty implies default page size
|
||||
)
|
||||
@ -283,7 +284,7 @@ procSuite "Waku Rest API - Store v3":
|
||||
|
||||
var pages = newSeq[seq[WakuMessage]](2)
|
||||
|
||||
var reqHash = none(WakuMessageHash)
|
||||
var reqHash = none(string)
|
||||
|
||||
for i in 0 ..< 2:
|
||||
let response = await client.getStoreMessagesV3(
|
||||
@ -295,9 +296,9 @@ procSuite "Waku Rest API - Store v3":
|
||||
"", # end time. Empty ignores the field.
|
||||
"", # hashes
|
||||
if reqHash.isSome():
|
||||
reqHash.get().toRestStringWakuMessageHash()
|
||||
reqHash.get()
|
||||
else:
|
||||
"", # base64-encoded digest. Empty ignores the field.
|
||||
"", # hex-encoded digest. Empty ignores the field.
|
||||
"true", # ascending
|
||||
"7", # page size. Empty implies default page size.
|
||||
)
|
||||
@ -775,7 +776,7 @@ procSuite "Waku Rest API - Store v3":
|
||||
var pages = newSeq[seq[WakuMessage]](2)
|
||||
|
||||
var reqPubsubTopic = DefaultPubsubTopic
|
||||
var reqHash = none(WakuMessageHash)
|
||||
var reqHash = none(string)
|
||||
|
||||
for i in 0 ..< 2:
|
||||
let response = await client.getStoreMessagesV3(
|
||||
@ -787,9 +788,9 @@ procSuite "Waku Rest API - Store v3":
|
||||
"", # end time. Empty ignores the field.
|
||||
"", # hashes
|
||||
if reqHash.isSome():
|
||||
reqHash.get().toRestStringWakuMessageHash()
|
||||
reqHash.get()
|
||||
else:
|
||||
"", # base64-encoded digest. Empty ignores the field.
|
||||
"", # hex-encoded digest. Empty ignores the field.
|
||||
"true", # ascending
|
||||
"3", # page size. Empty implies default page size.
|
||||
)
|
||||
@ -823,9 +824,9 @@ procSuite "Waku Rest API - Store v3":
|
||||
"", # end time. Empty ignores the field.
|
||||
"", # hashes
|
||||
if reqHash.isSome():
|
||||
reqHash.get().toRestStringWakuMessageHash()
|
||||
reqHash.get()
|
||||
else:
|
||||
"", # base64-encoded digest. Empty ignores the field.
|
||||
"", # hex-encoded digest. Empty ignores the field.
|
||||
)
|
||||
|
||||
check:
|
||||
@ -845,9 +846,9 @@ procSuite "Waku Rest API - Store v3":
|
||||
"", # end time. Empty ignores the field.
|
||||
"", # hashes
|
||||
if reqHash.isSome():
|
||||
reqHash.get().toRestStringWakuMessageHash()
|
||||
reqHash.get()
|
||||
else:
|
||||
"", # base64-encoded digest. Empty ignores the field.
|
||||
"", # hex-encoded digest. Empty ignores the field.
|
||||
"true", # ascending
|
||||
"5", # page size. Empty implies default page size.
|
||||
)
|
||||
|
@ -15,12 +15,12 @@ logScope:
|
||||
topics = "waku node rest store_api"
|
||||
|
||||
proc decodeBytes*(
|
||||
t: typedesc[StoreQueryResponse],
|
||||
t: typedesc[StoreQueryResponseHex],
|
||||
data: openArray[byte],
|
||||
contentType: Opt[ContentTypeData],
|
||||
): RestResult[StoreQueryResponse] =
|
||||
): RestResult[StoreQueryResponseHex] =
|
||||
if MediaType.init($contentType) == MIMETYPE_JSON:
|
||||
let decoded = ?decodeFromJsonBytes(StoreQueryResponse, data)
|
||||
let decoded = ?decodeFromJsonBytes(StoreQueryResponseHex, data)
|
||||
return ok(decoded)
|
||||
|
||||
if MediaType.init($contentType) == MIMETYPE_TEXT:
|
||||
@ -30,11 +30,11 @@ proc decodeBytes*(
|
||||
copyMem(addr res[0], unsafeAddr data[0], len(data))
|
||||
|
||||
return ok(
|
||||
StoreQueryResponse(
|
||||
StoreQueryResponseHex(
|
||||
statusCode: uint32(ErrorCode.BAD_RESPONSE),
|
||||
statusDesc: res,
|
||||
messages: newSeq[WakuMessageKeyValue](0),
|
||||
paginationCursor: none(WakuMessageHash),
|
||||
messages: newSeq[WakuMessageKeyValueHex](0),
|
||||
paginationCursor: none(string),
|
||||
)
|
||||
)
|
||||
|
||||
@ -58,6 +58,6 @@ proc getStoreMessagesV3*(
|
||||
cursor: string = "", # base64-encoded hash
|
||||
ascending: string = "",
|
||||
pageSize: string = "",
|
||||
): RestResponse[StoreQueryResponse] {.
|
||||
): RestResponse[StoreQueryResponseHex] {.
|
||||
rest, endpoint: "/store/v3/messages", meth: HttpMethod.MethodGet
|
||||
.}
|
||||
|
@ -42,7 +42,7 @@ proc performStoreQuery(
|
||||
error msg, error = futRes.error
|
||||
return RestApiResponse.internalServerError(fmt("{msg} [{futRes.error}]"))
|
||||
|
||||
let res = futRes.get()
|
||||
let res = futRes.get().toHex()
|
||||
|
||||
if res.statusCode == uint32(ErrorCode.TOO_MANY_REQUESTS):
|
||||
debug "Request rate limit reached on peer ", storePeer
|
||||
@ -165,7 +165,7 @@ proc retrieveMsgsFromSelfNode(
|
||||
let storeResp = (await self.wakuStore.handleSelfStoreRequest(storeQuery)).valueOr:
|
||||
return RestApiResponse.internalServerError($error)
|
||||
|
||||
let resp = RestApiResponse.jsonResponse(storeResp, status = Http200).valueOr:
|
||||
let resp = RestApiResponse.jsonResponse(storeResp.toHex(), status = Http200).valueOr:
|
||||
const msg = "Error building the json respose"
|
||||
let e = $error
|
||||
error msg, error = e
|
||||
|
@ -18,19 +18,22 @@ Json.setWriter JsonWriter, PreferredOutput = string
|
||||
#### Type conversion
|
||||
|
||||
proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string] =
|
||||
let base64UrlEncoded =
|
||||
let hexUrlEncoded =
|
||||
if input.isSome():
|
||||
input.get()
|
||||
else:
|
||||
return ok(none(WakuMessageHash))
|
||||
|
||||
if base64UrlEncoded == "":
|
||||
if hexUrlEncoded == "":
|
||||
return ok(none(WakuMessageHash))
|
||||
|
||||
let base64Encoded = decodeUrl(base64UrlEncoded, false)
|
||||
let hexDecoded = decodeUrl(hexUrlEncoded, false)
|
||||
|
||||
let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr:
|
||||
return err("waku message hash parsing error: " & error)
|
||||
var decodedBytes: seq[byte]
|
||||
try:
|
||||
decodedBytes = hexToSeqByte(hexDecoded)
|
||||
except ValueError as e:
|
||||
return err("Exception converting hex string to bytes: " & e.msg)
|
||||
|
||||
if decodedBytes.len != 32:
|
||||
return
|
||||
@ -58,12 +61,12 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] =
|
||||
return ok(hashes)
|
||||
|
||||
# Converts a given MessageDigest object into a suitable
|
||||
# Base64-URL-encoded string suitable to be transmitted in a Rest
|
||||
# request-response. The MessageDigest is first base64 encoded
|
||||
# Hex-URL-encoded string suitable to be transmitted in a Rest
|
||||
# request-response. The MessageDigest is first hex encoded
|
||||
# and this result is URL-encoded.
|
||||
proc toRestStringWakuMessageHash*(self: WakuMessageHash): string =
|
||||
let base64Encoded = base64.encode(self)
|
||||
encodeUrl($base64Encoded, false)
|
||||
let hexEncoded = self.to0xHex()
|
||||
encodeUrl(hexEncoded, false)
|
||||
|
||||
## WakuMessage serde
|
||||
|
||||
@ -147,14 +150,14 @@ proc readValue*(
|
||||
proof: proof,
|
||||
)
|
||||
|
||||
## WakuMessageKeyValue serde
|
||||
## WakuMessageKeyValueHex serde
|
||||
|
||||
proc writeValue*(
|
||||
writer: var JsonWriter, value: WakuMessageKeyValue
|
||||
writer: var JsonWriter, value: WakuMessageKeyValueHex
|
||||
) {.gcsafe, raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
|
||||
writer.writeField("messageHash", base64.encode(value.messageHash))
|
||||
writer.writeField("messageHash", value.messageHash)
|
||||
|
||||
if value.message.isSome():
|
||||
writer.writeField("message", value.message.get())
|
||||
@ -165,10 +168,10 @@ proc writeValue*(
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(
|
||||
reader: var JsonReader, value: var WakuMessageKeyValue
|
||||
reader: var JsonReader, value: var WakuMessageKeyValueHex
|
||||
) {.gcsafe, raises: [SerializationError, IOError].} =
|
||||
var
|
||||
messageHash = none(WakuMessageHash)
|
||||
messageHash = none(string)
|
||||
message = none(WakuMessage)
|
||||
pubsubTopic = none(PubsubTopic)
|
||||
|
||||
@ -177,22 +180,19 @@ proc readValue*(
|
||||
of "messageHash":
|
||||
if messageHash.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `messageHash` fields found", "WakuMessageKeyValue"
|
||||
"Multiple `messageHash` fields found", "WakuMessageKeyValueHex"
|
||||
)
|
||||
let base64String = reader.readValue(Base64String)
|
||||
let bytes = base64.decode(base64String).valueOr:
|
||||
reader.raiseUnexpectedField("Failed decoding data", "messageHash")
|
||||
messageHash = some(fromBytes(bytes))
|
||||
messageHash = some(reader.readValue(string))
|
||||
of "message":
|
||||
if message.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `message` fields found", "WakuMessageKeyValue"
|
||||
"Multiple `message` fields found", "WakuMessageKeyValueHex"
|
||||
)
|
||||
message = some(reader.readValue(WakuMessage))
|
||||
of "pubsubTopic":
|
||||
if pubsubTopic.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `pubsubTopic` fields found", "WakuMessageKeyValue"
|
||||
"Multiple `pubsubTopic` fields found", "WakuMessageKeyValueHex"
|
||||
)
|
||||
pubsubTopic = some(reader.readValue(string))
|
||||
else:
|
||||
@ -201,14 +201,14 @@ proc readValue*(
|
||||
if messageHash.isNone():
|
||||
reader.raiseUnexpectedValue("Field `messageHash` is missing")
|
||||
|
||||
value = WakuMessageKeyValue(
|
||||
value = WakuMessageKeyValueHex(
|
||||
messageHash: messageHash.get(), message: message, pubsubTopic: pubsubTopic
|
||||
)
|
||||
|
||||
## StoreQueryResponse serde
|
||||
|
||||
proc writeValue*(
|
||||
writer: var JsonWriter, value: StoreQueryResponse
|
||||
writer: var JsonWriter, value: StoreQueryResponseHex
|
||||
) {.gcsafe, raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
|
||||
@ -218,55 +218,52 @@ proc writeValue*(
|
||||
writer.writeField("messages", value.messages)
|
||||
|
||||
if value.paginationCursor.isSome():
|
||||
writer.writeField("paginationCursor", base64.encode(value.paginationCursor.get()))
|
||||
writer.writeField("paginationCursor", value.paginationCursor.get())
|
||||
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(
|
||||
reader: var JsonReader, value: var StoreQueryResponse
|
||||
reader: var JsonReader, value: var StoreQueryResponseHex
|
||||
) {.gcsafe, raises: [SerializationError, IOError].} =
|
||||
var
|
||||
requestId = none(string)
|
||||
code = none(uint32)
|
||||
desc = none(string)
|
||||
messages = none(seq[WakuMessageKeyValue])
|
||||
cursor = none(WakuMessageHash)
|
||||
messages = none(seq[WakuMessageKeyValueHex])
|
||||
cursor = none(string)
|
||||
|
||||
for fieldName in readObjectFields(reader):
|
||||
case fieldName
|
||||
of "requestId":
|
||||
if requestId.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `requestId` fields found", "StoreQueryResponse"
|
||||
"Multiple `requestId` fields found", "StoreQueryResponseHex"
|
||||
)
|
||||
requestId = some(reader.readValue(string))
|
||||
of "statusCode":
|
||||
if code.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `statusCode` fields found", "StoreQueryResponse"
|
||||
"Multiple `statusCode` fields found", "StoreQueryResponseHex"
|
||||
)
|
||||
code = some(reader.readValue(uint32))
|
||||
of "statusDesc":
|
||||
if desc.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `statusDesc` fields found", "StoreQueryResponse"
|
||||
"Multiple `statusDesc` fields found", "StoreQueryResponseHex"
|
||||
)
|
||||
desc = some(reader.readValue(string))
|
||||
of "messages":
|
||||
if messages.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `messages` fields found", "StoreQueryResponse"
|
||||
"Multiple `messages` fields found", "StoreQueryResponseHex"
|
||||
)
|
||||
messages = some(reader.readValue(seq[WakuMessageKeyValue]))
|
||||
messages = some(reader.readValue(seq[WakuMessageKeyValueHex]))
|
||||
of "paginationCursor":
|
||||
if cursor.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `paginationCursor` fields found", "StoreQueryResponse"
|
||||
"Multiple `paginationCursor` fields found", "StoreQueryResponseHex"
|
||||
)
|
||||
let base64String = reader.readValue(Base64String)
|
||||
let bytes = base64.decode(base64String).valueOr:
|
||||
reader.raiseUnexpectedField("Failed decoding data", "paginationCursor")
|
||||
cursor = some(fromBytes(bytes))
|
||||
cursor = some(reader.readValue(string))
|
||||
else:
|
||||
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
|
||||
|
||||
@ -282,7 +279,7 @@ proc readValue*(
|
||||
if messages.isNone():
|
||||
reader.raiseUnexpectedValue("Field `messages` is missing")
|
||||
|
||||
value = StoreQueryResponse(
|
||||
value = StoreQueryResponseHex(
|
||||
requestId: requestId.get(),
|
||||
statusCode: code.get(),
|
||||
statusDesc: desc.get(),
|
||||
|
@ -1,6 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/sequtils, stew/[byteutils, endians2, arrayops], nimcrypto/sha2
|
||||
import std/sequtils, stew/[byteutils, endians2, arrayops], nimcrypto/sha2, results
|
||||
import ../topics, ./message
|
||||
|
||||
## 14/WAKU2-MESSAGE: Deterministic message hashing
|
||||
@ -35,6 +35,16 @@ converter toBytesArray*(digest: MDigest[256]): WakuMessageHash =
|
||||
converter toBytes*(digest: MDigest[256]): seq[byte] =
|
||||
toSeq(digest.data)
|
||||
|
||||
proc hexToHash*(hexString: string): Result[WakuMessageHash, string] =
|
||||
var hash: WakuMessageHash
|
||||
|
||||
try:
|
||||
hash = hexString.hexToSeqByte().fromBytes()
|
||||
except ValueError as e:
|
||||
return err("Exception converting hex string to hash: " & e.msg)
|
||||
|
||||
return ok(hash)
|
||||
|
||||
proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessageHash =
|
||||
var ctx: sha256
|
||||
ctx.init()
|
||||
|
@ -1,6 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options], results
|
||||
import std/[options, sequtils], results, stew/byteutils
|
||||
import ../waku_core, ../common/paging
|
||||
|
||||
from ../waku_core/codecs import WakuStoreCodec
|
||||
@ -48,6 +48,22 @@ type
|
||||
|
||||
paginationCursor*: Option[WakuMessageHash]
|
||||
|
||||
# Types to be used by clients that use the hash in hex
|
||||
WakuMessageKeyValueHex* = object
|
||||
messageHash*: string
|
||||
message*: Option[WakuMessage]
|
||||
pubsubTopic*: Option[PubsubTopic]
|
||||
|
||||
StoreQueryResponseHex* = object
|
||||
requestId*: string
|
||||
|
||||
statusCode*: uint32
|
||||
statusDesc*: string
|
||||
|
||||
messages*: seq[WakuMessageKeyValueHex]
|
||||
|
||||
paginationCursor*: Option[string]
|
||||
|
||||
StatusCode* {.pure.} = enum
|
||||
UNKNOWN = uint32(000)
|
||||
SUCCESS = uint32(200)
|
||||
@ -117,3 +133,24 @@ proc `$`*(err: StoreError): string =
|
||||
"SERVICE_UNAVAILABLE"
|
||||
of ErrorCode.UNKNOWN:
|
||||
"UNKNOWN"
|
||||
|
||||
proc toHex*(messageData: WakuMessageKeyValue): WakuMessageKeyValueHex =
|
||||
WakuMessageKeyValueHex(
|
||||
messageHash: messageData.messageHash.to0xHex(),
|
||||
# Assuming WakuMessageHash has a toHex method
|
||||
message: messageData.message,
|
||||
pubsubTopic: messageData.pubsubTopic,
|
||||
)
|
||||
|
||||
proc toHex*(response: StoreQueryResponse): StoreQueryResponseHex =
|
||||
StoreQueryResponseHex(
|
||||
requestId: response.requestId,
|
||||
statusCode: response.statusCode,
|
||||
statusDesc: response.statusDesc,
|
||||
messages: response.messages.mapIt(it.toHex()), # Convert each message to hex
|
||||
paginationCursor:
|
||||
if response.paginationCursor.isSome:
|
||||
some(response.paginationCursor.get().to0xHex())
|
||||
else:
|
||||
none[string](),
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user