diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index 142c9cc..fe15748 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -7,13 +7,71 @@ import ../../provider import ./rpccalls import ./conversions import ./looping +import ./json type JsonRpcSubscriptions* = ref object of RootObj client: RpcClient callbacks: Table[JsonNode, SubscriptionCallback] + methodHandlers: Table[string, MethodHandler] + MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].} SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, raises:[].} +{.push raises:[].} + +template `or`(a: JsonNode, b: typed): JsonNode = + if a.isNil: b else: a + +func init*(subscriptions: JsonRpcSubscriptions) = + subscriptions.client.onProcessMessage = + proc(client: RpcClient, + line: string): Result[bool, string] {.gcsafe, raises: [].} = + # if json =? JrpcConv.decode(line, JsonNode).catch: + if json =? JsonNode.fromJson(line): + if "method" in json: + let methodName = json{"method"}.getStr() + if methodName in subscriptions.methodHandlers: + let handler = subscriptions.methodHandlers.getOrDefault(methodName) + if not handler.isNil: + echo "[subscriptions] processing method handler with params ", json{"params"} + handler(json{"params"} or newJArray()) + return ok false # false = do not continue processing message using json_rpc's default processing handler + + return ok true # true = continue processing message using json_rpc's default message handler + + +proc setMethodHandler( + subscriptions: JsonRpcSubscriptions, + `method`: string, + handler: MethodHandler +) = + subscriptions.methodHandlers[`method`] = handler + +func hash*(n: OrderedTable[string, JsonNode]): Hash {.raises: [].} + +proc hash*(n: JsonNode): Hash {.noSideEffect, raises: [].} = + ## Compute the hash for a JSON node + case n.kind + of JArray: + result = hash(n.elems) + of JObject: + result = hash(n.fields) + of JInt: + result = hash(n.num) + of JFloat: + result = hash(n.fnum) + of JBool: + result = hash(n.bval.int) + of JString: + result = hash(n.str) + of JNull: + result = Hash(0) + +func hash*(n: OrderedTable[string, JsonNode]): Hash {.raises: [].} = + for key, val in n: + result = result xor (hash(key) !& hash(val)) + result = !$result + method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, onBlock: BlockHandler): Future[JsonNode] @@ -56,18 +114,20 @@ proc new*(_: type JsonRpcSubscriptions, client: RpcWebSocketClient): JsonRpcSubscriptions = let subscriptions = WebSocketSubscriptions(client: client) proc subscriptionHandler(arguments: JsonNode) {.raises:[].} = - if id =? arguments["subscription"].catch and + if id =? arguments{"subscription"}.catch and callback =? subscriptions.getCallback(id): + echo "[subscription handler] calling callback for id ", id callback(id, arguments) - client.setMethodHandler("eth_subscription", subscriptionHandler) + subscriptions.setMethodHandler("eth_subscription", subscriptionHandler) subscriptions method subscribeBlocks(subscriptions: WebSocketSubscriptions, onBlock: BlockHandler): Future[JsonNode] {.async.} = - proc callback(id, arguments: JsonNode) = - if blck =? Block.fromJson(arguments["result"]).catch: + proc callback(id, arguments: JsonNode) {.raises: [].} = + if blck =? Block.fromJson(arguments{"result"}): + echo "[subscription.subscribeBlocks callback] calling onBlock callback with Block" onBlock(blck) let id = await subscriptions.client.eth_subscribe("newHeads") subscriptions.callbacks[id] = callback @@ -79,13 +139,13 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions, Future[JsonNode] {.async.} = proc callback(id, arguments: JsonNode) = - if log =? Log.fromJson(arguments["result"]).catch: + if log =? Log.fromJson(arguments{"result"}): onLog(log) let id = await subscriptions.client.eth_subscribe("logs", filter) subscriptions.callbacks[id] = callback return id -method unsubscribe(subscriptions: WebSocketSubscriptions, +method unsubscribe*(subscriptions: WebSocketSubscriptions, id: JsonNode) {.async.} = subscriptions.callbacks.del(id) @@ -161,7 +221,7 @@ method subscribeLogs(subscriptions: PollingSubscriptions, subscriptions.callbacks[id] = callback return id -method unsubscribe(subscriptions: PollingSubscriptions, +method unsubscribe*(subscriptions: PollingSubscriptions, id: JsonNode) {.async.} = subscriptions.callbacks.del(id)