From b3ab9ed474e85b44dcd25c84c996cb06b2508693 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Tue, 20 Feb 2024 16:00:03 -0400 Subject: [PATCH] fix(bindings): base64 payload and key for content topic (#2435) * fix(bindings): base64 payload and key for content topic * fix(bindings): store userData for event callback * fix(bindings): json message serialization * fix(bindings): add messageHash to the event callback * fix(bindings): add meta field * refactor(bindings): simplify error handling * fix: handle undefined keys --- library/events/json_error_event.nim | 18 ---- library/events/json_message_event.nim | 86 ++++++++++++++++--- library/libwaku.nim | 57 ++++++------ library/waku_thread/config.nim | 79 +++++++++-------- .../requests/node_lifecycle_request.nim | 14 +-- library/waku_thread/waku_thread.nim | 3 +- 6 files changed, 149 insertions(+), 108 deletions(-) delete mode 100644 library/events/json_error_event.nim diff --git a/library/events/json_error_event.nim b/library/events/json_error_event.nim deleted file mode 100644 index 639a5b670..000000000 --- a/library/events/json_error_event.nim +++ /dev/null @@ -1,18 +0,0 @@ - -import - std/json -import - ./json_base_event - -type JsonErrorEvent* = ref object of JsonEvent - message*: string - -proc new*(T: type JsonErrorEvent, - message: string): T = - - return JsonErrorEvent( - eventType: "error", - message: message) - -method `$`*(jsonError: JsonErrorEvent): string {.raises: [].}= - $( %* jsonError ) \ No newline at end of file diff --git a/library/events/json_message_event.nim b/library/events/json_message_event.nim index 826291df0..459a44213 100644 --- a/library/events/json_message_event.nim +++ b/library/events/json_message_event.nim @@ -1,16 +1,63 @@ import - std/json + system, + std/[json,sequtils] import + stew/[byteutils,results] +import + ../../waku/common/base64, + ../../waku/waku_core/message, ../../waku/waku_core/message/message, ./json_base_event -type JsonMessage = ref object - # https://rfc.vac.dev/spec/36/#jsonmessage-type - payload: string - contentTopic: string - version: uint - timestamp: int64 +type + JsonMessage* = ref object + # https://rfc.vac.dev/spec/36/#jsonmessage-type + payload*: Base64String + contentTopic*: string + version*: uint + timestamp*: int64 + ephemeral*: bool + meta*: Base64String + proof*: Base64String + +func fromJsonNode*(T: type JsonMessage, jsonContent: JsonNode): JsonMessage = + # Visit https://rfc.vac.dev/spec/14/ for further details + JsonMessage( + payload: Base64String(jsonContent["payload"].getStr()), + contentTopic: jsonContent["contentTopic"].getStr(), + version: uint32(jsonContent{"version"}.getInt()), + timestamp: int64(jsonContent{"timestamp"}.getBiggestInt()), + ephemeral: jsonContent{"ephemeral"}.getBool(), + meta: Base64String(jsonContent{"meta"}.getStr()), + proof: Base64String(jsonContent{"proof"}.getStr()) + ) + +proc toWakuMessage*(self: JsonMessage): Result[WakuMessage, string] = + let payload = base64.decode(self.payload).valueOr: + return err("invalid payload format: " & error) + + let meta = base64.decode(self.meta).valueOr: + return err("invalid meta format: " & error) + + let proof = base64.decode(self.proof).valueOr: + return err("invalid proof format: " & error) + + ok(WakuMessage( + payload: payload, + meta: meta, + contentTopic: self.contentTopic, + version: uint32(self.version), + timestamp: self.timestamp, + ephemeral: self.ephemeral, + proof: proof, + )) + +proc `%`*(value: Base64String): JsonNode = + %(value.string) + +proc `%`*(value: WakuMessageHash): JsonNode = + %(to0xHex(value)) type JsonMessageEvent* = ref object of JsonEvent pubsubTopic*: string @@ -23,18 +70,33 @@ proc new*(T: type JsonMessageEvent, # Returns a WakuMessage event as indicated in # https://rfc.vac.dev/spec/36/#jsonmessageevent-type - var payload = newString(len(msg.payload)) - copyMem(addr payload[0], unsafeAddr msg.payload[0], len(msg.payload)) + var payload = newSeq[byte](len(msg.payload)) + if len(msg.payload) != 0: + copyMem(addr payload[0], unsafeAddr msg.payload[0], len(msg.payload)) + + var meta = newSeq[byte](len(msg.meta)) + if len(msg.meta) != 0: + copyMem(addr meta[0], unsafeAddr msg.meta[0], len(msg.meta)) + + var proof = newSeq[byte](len(msg.proof)) + if len(msg.proof) != 0: + copyMem(addr proof[0], unsafeAddr msg.proof[0], len(msg.proof)) + + let msgHash = computeMessageHash(pubSubTopic, msg) + let msgHashHex = to0xHex(msgHash) return JsonMessageEvent( eventType: "message", pubSubTopic: pubSubTopic, - messageId: "TODO", + messageId: msgHashHex, wakuMessage: JsonMessage( - payload: payload, + payload: base64.encode(payload), contentTopic: msg.contentTopic, version: msg.version, - timestamp: int64(msg.timestamp) + timestamp: int64(msg.timestamp), + ephemeral: msg.ephemeral, + meta: base64.encode(meta), + proof: base64.encode(proof), ) ) diff --git a/library/libwaku.nim b/library/libwaku.nim index 976948bff..3ef27f0d3 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -9,10 +9,12 @@ import chronicles, chronos import + ../../waku/common/base64, ../../waku/waku_core/message/message, ../../waku/node/waku_node, ../../waku/waku_core/topics/pubsub_topic, ../../../waku/waku_relay/protocol, + ./events/json_base_event, ./events/json_message_event, ./waku_thread/waku_thread, ./waku_thread/inter_thread_communication/requests/node_lifecycle_request, @@ -43,16 +45,22 @@ const RET_MISSING_CALLBACK: cint = 2 proc relayEventCallback(ctx: ptr Context): WakuRelayHandler = return proc (pubsubTopic: PubsubTopic, msg: WakuMessage): Future[system.void]{.async.} = # Callback that hadles the Waku Relay events. i.e. messages or errors. - if not isNil(ctx[].eventCallback): - try: - let event = $JsonMessageEvent.new(pubsubTopic, msg) - cast[WakuCallBack](ctx[].eventCallback)(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), nil) - except Exception,CatchableError: - let msg = "Exception when calling 'eventCallBack': " & - getCurrentExceptionMsg() - cast[WakuCallBack](ctx[].eventCallback)(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), nil) - else: + if isNil(ctx[].eventCallback): error "eventCallback is nil" + return + + if isNil(ctx[].eventUserData): + error "eventUserData is nil" + return + + try: + let event = $JsonMessageEvent.new(pubsubTopic, msg) + cast[WakuCallBack](ctx[].eventCallback)(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData) + except Exception,CatchableError: + let msg = "Exception when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[WakuCallBack](ctx[].eventCallback)(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData) + ### End of not-exported components ################################################################################ @@ -106,8 +114,10 @@ proc waku_version(ctx: ptr Context, return RET_OK proc waku_set_event_callback(ctx: ptr Context, - callback: WakuCallBack) {.dynlib, exportc.} = + callback: WakuCallBack, + userData: pointer) {.dynlib, exportc.} = ctx[].eventCallback = cast[pointer](callback) + ctx[].eventUserData = userData proc waku_content_topic(ctx: ptr Context, appName: cstring, @@ -186,33 +196,20 @@ proc waku_relay_publish(ctx: ptr Context, return RET_MISSING_CALLBACK let jwm = jsonWakuMessage.alloc() - var jsonContent:JsonNode + var jsonMessage:JsonMessage try: - jsonContent = parseJson($jwm) + let jsonContent = parseJson($jwm) + jsonMessage = JsonMessage.fromJsonNode(jsonContent) except JsonParsingError: deallocShared(jwm) let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR + finally: + deallocShared(jwm) - deallocShared(jwm) - - var wakuMessage: WakuMessage - try: - var version = 0'u32 - if jsonContent.hasKey("version"): - version = (uint32) jsonContent["version"].getInt() - - wakuMessage = WakuMessage( - # Visit https://rfc.vac.dev/spec/14/ for further details - payload: jsonContent["payload"].getStr().toSeq().mapIt(byte (it)), - contentTopic: $jsonContent["content_topic"].getStr(), - version: version, - timestamp: getTime().toUnix(), - ephemeral: false - ) - except KeyError: - let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}" + let wakuMessage = jsonMessage.toWakuMessage().valueOr: + let msg = fmt"Problem building the WakuMessage: {error}" callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR diff --git a/library/waku_thread/config.nim b/library/waku_thread/config.nim index b0565ef5f..ad182aeaa 100644 --- a/library/waku_thread/config.nim +++ b/library/waku_thread/config.nim @@ -10,18 +10,18 @@ import ../../waku/common/utils/nat, ../../waku/node/waku_node, ../../waku/node/config, - ../events/[json_error_event,json_base_event] + ../events/json_base_event proc parsePrivateKey(jsonNode: JsonNode, privateKey: var PrivateKey, - jsonResp: var JsonEvent): bool = + errorResp: var string): bool = - if not jsonNode.contains("key"): + if not jsonNode.contains("key") or jsonNode["key"].kind == JsonNodeKind.JNull: privateKey = PrivateKey.random(Secp256k1, newRng()[]).tryGet() return true if jsonNode["key"].kind != JsonNodeKind.JString: - jsonResp = JsonErrorEvent.new("The node key should be a string."); + errorResp = "The node key should be a string." return false let key = jsonNode["key"].getStr() @@ -31,21 +31,21 @@ proc parsePrivateKey(jsonNode: JsonNode, privateKey = crypto.PrivateKey(scheme: Secp256k1, skkey: skPrivKey) except CatchableError: let msg = "Invalid node key: " & getCurrentExceptionMsg() - jsonResp = JsonErrorEvent.new(msg) + errorResp = msg return false return true proc parseListenAddr(jsonNode: JsonNode, listenAddr: var IpAddress, - jsonResp: var JsonEvent): bool = + errorResp: var string): bool = if not jsonNode.contains("host"): - jsonResp = JsonErrorEvent.new("host attribute is required") + errorResp = "host attribute is required" return false if jsonNode["host"].kind != JsonNodeKind.JString: - jsonResp = JsonErrorEvent.new("The node host should be a string."); + errorResp = "The node host should be a string." return false let host = jsonNode["host"].getStr() @@ -54,21 +54,21 @@ proc parseListenAddr(jsonNode: JsonNode, listenAddr = parseIpAddress(host) except CatchableError: let msg = "Invalid host IP address: " & getCurrentExceptionMsg() - jsonResp = JsonErrorEvent.new(msg) + errorResp = msg return false return true proc parsePort(jsonNode: JsonNode, port: var int, - jsonResp: var JsonEvent): bool = + errorResp: var string): bool = if not jsonNode.contains("port"): - jsonResp = JsonErrorEvent.new("port attribute is required") + errorResp = "port attribute is required" return false if jsonNode["port"].kind != JsonNodeKind.JInt: - jsonResp = JsonErrorEvent.new("The node port should be an integer."); + errorResp = "The node port should be an integer." return false port = jsonNode["port"].getInt() @@ -77,14 +77,14 @@ proc parsePort(jsonNode: JsonNode, proc parseRelay(jsonNode: JsonNode, relay: var bool, - jsonResp: var JsonEvent): bool = + errorResp: var string): bool = if not jsonNode.contains("relay"): - jsonResp = JsonErrorEvent.new("relay attribute is required") + errorResp = "relay attribute is required" return false if jsonNode["relay"].kind != JsonNodeKind.JBool: - jsonResp = JsonErrorEvent.new("The relay config param should be a boolean"); + errorResp = "The relay config param should be a boolean" return false relay = jsonNode["relay"].getBool() @@ -99,7 +99,7 @@ proc parseStore(jsonNode: JsonNode, storeVacuum: var bool, storeDbMigration: var bool, storeMaxNumDbConnections: var int, - jsonResp: var JsonEvent): bool = + errorResp: var string): bool = if not jsonNode.contains("store"): ## the store parameter is not required. By default is is disabled @@ -107,49 +107,49 @@ proc parseStore(jsonNode: JsonNode, return true if jsonNode["store"].kind != JsonNodeKind.JBool: - jsonResp = JsonErrorEvent.new("The store config param should be a boolean"); + errorResp = "The store config param should be a boolean" return false store = jsonNode["store"].getBool() if jsonNode.contains("storeNode"): if jsonNode["storeNode"].kind != JsonNodeKind.JString: - jsonResp = JsonErrorEvent.new("The storeNode config param should be a string"); + errorResp = "The storeNode config param should be a string" return false storeNode = jsonNode["storeNode"].getStr() if jsonNode.contains("storeRetentionPolicy"): if jsonNode["storeRetentionPolicy"].kind != JsonNodeKind.JString: - jsonResp = JsonErrorEvent.new("The storeRetentionPolicy config param should be a string"); + errorResp = "The storeRetentionPolicy config param should be a string" return false storeRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr() if jsonNode.contains("storeDbUrl"): if jsonNode["storeDbUrl"].kind != JsonNodeKind.JString: - jsonResp = JsonErrorEvent.new("The storeDbUrl config param should be a string"); + errorResp = "The storeDbUrl config param should be a string" return false storeDbUrl = jsonNode["storeDbUrl"].getStr() if jsonNode.contains("storeVacuum"): if jsonNode["storeVacuum"].kind != JsonNodeKind.JBool: - jsonResp = JsonErrorEvent.new("The storeVacuum config param should be a bool"); + errorResp = "The storeVacuum config param should be a bool" return false storeVacuum = jsonNode["storeVacuum"].getBool() if jsonNode.contains("storeDbMigration"): if jsonNode["storeDbMigration"].kind != JsonNodeKind.JBool: - jsonResp = JsonErrorEvent.new("The storeDbMigration config param should be a bool"); + errorResp = "The storeDbMigration config param should be a bool" return false storeDbMigration = jsonNode["storeDbMigration"].getBool() if jsonNode.contains("storeMaxNumDbConnections"): if jsonNode["storeMaxNumDbConnections"].kind != JsonNodeKind.JInt: - jsonResp = JsonErrorEvent.new("The storeMaxNumDbConnections config param should be an int"); + errorResp = "The storeMaxNumDbConnections config param should be an int" return false storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt() @@ -175,51 +175,51 @@ proc parseConfig*(configNodeJson: string, storeVacuum: var bool, storeDbMigration: var bool, storeMaxNumDbConnections: var int, - jsonResp: var JsonEvent): bool {.raises: [].} = + errorResp: var string): bool {.raises: [].} = if configNodeJson.len == 0: - jsonResp = JsonErrorEvent.new("The configNodeJson is empty") + errorResp = "The configNodeJson is empty" return false var jsonNode: JsonNode try: jsonNode = parseJson(configNodeJson) except Exception, IOError, JsonParsingError: - jsonResp = JsonErrorEvent.new("Exception: " & getCurrentExceptionMsg()) + errorResp = "Exception: " & getCurrentExceptionMsg() return false # key try: - if not parsePrivateKey(jsonNode, privateKey, jsonResp): + if not parsePrivateKey(jsonNode, privateKey, errorResp): return false except Exception, KeyError: - jsonResp = JsonErrorEvent.new("Exception calling parsePrivateKey: " & getCurrentExceptionMsg()) + errorResp = "Exception calling parsePrivateKey: " & getCurrentExceptionMsg() return false # listenAddr var listenAddr: IpAddress try: listenAddr = parseIpAddress("127.0.0.1") - if not parseListenAddr(jsonNode, listenAddr, jsonResp): + if not parseListenAddr(jsonNode, listenAddr, errorResp): return false except Exception, ValueError: - jsonResp = JsonErrorEvent.new("Exception calling parseIpAddress: " & getCurrentExceptionMsg()) + errorResp = "Exception calling parseIpAddress: " & getCurrentExceptionMsg() return false # port var port = 0 try: - if not parsePort(jsonNode, port, jsonResp): + if not parsePort(jsonNode, port, errorResp): return false except Exception, ValueError: - jsonResp = JsonErrorEvent.new("Exception calling parsePort: " & getCurrentExceptionMsg()) + errorResp = "Exception calling parsePort: " & getCurrentExceptionMsg() return false let natRes = setupNat("any", clientId, Port(uint16(port)), Port(uint16(port))) if natRes.isErr(): - jsonResp = JsonErrorEvent.new("failed to setup NAT: " & $natRes.error) + errorResp = "failed to setup NAT: " & $natRes.error return false let (extIp, extTcpPort, _) = natRes.get() @@ -231,26 +231,26 @@ proc parseConfig*(configNodeJson: string, # relay try: - if not parseRelay(jsonNode, relay, jsonResp): + if not parseRelay(jsonNode, relay, errorResp): return false except Exception, KeyError: - jsonResp = JsonErrorEvent.new("Exception calling parseRelay: " & getCurrentExceptionMsg()) + errorResp = "Exception calling parseRelay: " & getCurrentExceptionMsg() return false # topics try: parseTopics(jsonNode, topics) except Exception, KeyError: - jsonResp = JsonErrorEvent.new("Exception calling parseTopics: " & getCurrentExceptionMsg()) + errorResp = "Exception calling parseTopics: " & getCurrentExceptionMsg() return false # store try: if not parseStore(jsonNode, store, storeNode, storeRetentionPolicy, storeDbUrl, - storeVacuum, storeDbMigration, storeMaxNumDbConnections, jsonResp): + storeVacuum, storeDbMigration, storeMaxNumDbConnections, errorResp): return false except Exception, KeyError: - jsonResp = JsonErrorEvent.new("Exception calling parseStore: " & getCurrentExceptionMsg()) + errorResp = "Exception calling parseStore: " & getCurrentExceptionMsg() return false let wakuFlags = CapabilitiesBitfield.init( @@ -268,8 +268,7 @@ proc parseConfig*(configNodeJson: string, wakuFlags = some(wakuFlags)) if netConfigRes.isErr(): - let msg = "Error creating NetConfig: " & $netConfigRes.error - jsonResp = JsonErrorEvent.new(msg) + errorResp = "Error creating NetConfig: " & $netConfigRes.error return false netConfig = netConfigRes.value diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 3fa1dd92f..d6d3750da 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -24,7 +24,7 @@ import ../../../../waku/waku_archive/retention_policy, ../../../../waku/waku_relay/protocol, ../../../../waku/waku_store, - ../../../events/[json_error_event,json_message_event,json_base_event], + ../../../events/[json_message_event,json_base_event], ../../../alloc, ../../config @@ -122,7 +122,7 @@ proc createNode(configJson: cstring): var storeDbMigration: bool var storeMaxNumDbConnections: int - var jsonResp: JsonEvent + var errorResp: string try: if not parseConfig($configJson, @@ -137,8 +137,8 @@ proc createNode(configJson: cstring): storeVacuum, storeDbMigration, storeMaxNumDbConnections, - jsonResp): - return err($jsonResp) + errorResp): + return err(errorResp) except Exception: return err("exception calling parseConfig: " & getCurrentExceptionMsg()) @@ -158,13 +158,13 @@ proc createNode(configJson: cstring): let addShardedTopics = enrBuilder.withShardedTopics(topics) if addShardedTopics.isErr(): let msg = "Error setting shared topics: " & $addShardedTopics.error - return err($JsonErrorEvent.new(msg)) + return err(msg) let recordRes = enrBuilder.build() let record = if recordRes.isErr(): let msg = "Error building enr record: " & $recordRes.error - return err($JsonErrorEvent.new(msg)) + return err(msg) else: recordRes.get() @@ -183,7 +183,7 @@ proc createNode(configJson: cstring): let wakuNodeRes = builder.build() if wakuNodeRes.isErr(): let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error - return err($JsonErrorEvent.new(errorMsg)) + return err(errorMsg) var newNode = wakuNodeRes.get() diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index d69c7099e..14680b572 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -14,7 +14,7 @@ import stew/shims/net import ../../../waku/node/waku_node, - ../events/[json_error_event,json_message_event,json_base_event], + ../events/[json_message_event,json_base_event], ./inter_thread_communication/waku_thread_request, ./inter_thread_communication/waku_thread_response @@ -27,6 +27,7 @@ type respSignal: ThreadSignalPtr userData*: pointer eventCallback*: pointer + eventUserdata*: pointer # To control when the thread is running var running: Atomic[bool]