From 16fa0cfcf8b7acd7c47157acb19a75e049622741 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 27 Jun 2023 11:47:59 +0200 Subject: [PATCH] Use new subscription handling in JSON RPC provider --- ethers/providers/jsonrpc.nim | 112 +++++++++++------------------------ 1 file changed, 36 insertions(+), 76 deletions(-) diff --git a/ethers/providers/jsonrpc.nim b/ethers/providers/jsonrpc.nim index d80f5fc..c33956a 100644 --- a/ethers/providers/jsonrpc.nim +++ b/ethers/providers/jsonrpc.nim @@ -8,6 +8,7 @@ import ../provider import ../signer import ./jsonrpc/rpccalls import ./jsonrpc/conversions +import ./jsonrpc/subscriptions export json export basics @@ -18,10 +19,7 @@ push: {.upraises: [].} type JsonRpcProvider* = ref object of Provider client: Future[RpcClient] - subscriptions: Table[JsonNode, SubscriptionHandler] - JsonRpcSubscription = ref object of Subscription - provider: JsonRpcProvider - id: JsonNode + subscriptions: Future[JsonRpcSubscriptions] JsonRpcSigner* = ref object of Signer provider: JsonRpcProvider address: ?Address @@ -53,45 +51,34 @@ const defaultUrl = "http://localhost:8545" proc jsonHeaders: seq[(string, string)] = @[("Content-Type", "application/json")] -proc connect(_: type RpcClient, url: string): Future[RpcClient] {.async.} = - case parseUri(url).scheme - of "ws", "wss": - let client = newRpcWebSocketClient(getHeaders = jsonHeaders) - await client.connect(url) - return client - else: - let client = newRpcHttpClient(getHeaders = jsonHeaders) - await client.connect(url) - return client - -proc connect(provider: JsonRpcProvider, url: string) = - - proc getSubscriptionHandler(id: JsonNode): ?SubscriptionHandler = - try: - if provider.subscriptions.hasKey(id): - provider.subscriptions[id].some - else: - SubscriptionHandler.none - except Exception: - SubscriptionHandler.none - - proc handleSubscription(arguments: JsonNode) {.upraises: [].} = - if id =? arguments["subscription"].catch and - handler =? getSubscriptionHandler(id): - # fire and forget - discard handler(id, arguments) - - proc subscribe: Future[RpcClient] {.async.} = - let client = await RpcClient.connect(url) - client.setMethodHandler("eth_subscription", handleSubscription) - return client - - provider.client = subscribe() - proc new*(_: type JsonRpcProvider, url=defaultUrl): JsonRpcProvider = - let provider = JsonRpcProvider() - provider.connect(url) - provider + var initialized: Future[void] + var client: RpcClient + var subscriptions: JsonRpcSubscriptions + + proc initialize {.async.} = + case parseUri(url).scheme + of "ws", "wss": + let websocket = newRpcWebSocketClient(getHeaders = jsonHeaders) + await websocket.connect(url) + client = websocket + subscriptions = JsonRpcSubscriptions.new(websocket) + else: + let http = newRpcHttpClient(getHeaders = jsonHeaders) + await http.connect(url) + client = http + subscriptions = JsonRpcSubscriptions.new(http) + + proc awaitClient: Future[RpcClient] {.async.} = + await initialized + return client + + proc awaitSubscriptions: Future[JsonRpcSubscriptions] {.async.} = + await initialized + return subscriptions + + initialized = initialize() + JsonRpcProvider(client: awaitClient(), subscriptions: awaitSubscriptions()) proc send*(provider: JsonRpcProvider, call: string, @@ -171,47 +158,20 @@ method sendTransaction*(provider: JsonRpcProvider, rawTransaction: seq[byte]): F return TransactionResponse(hash: hash, provider: provider) -proc subscribe(provider: JsonRpcProvider, - name: string, - filter: ?Filter, - handler: SubscriptionHandler): Future[Subscription] {.async.} = - convertError: - let client = await provider.client - doAssert client of RpcWebSocketClient, "subscriptions require websockets" - - var id: JsonNode - if filter =? filter: - id = await client.eth_subscribe(name, filter) - else: - id = await client.eth_subscribe(name) - - provider.subscriptions[id] = handler - - return JsonRpcSubscription(id: id, provider: provider) - method subscribe*(provider: JsonRpcProvider, filter: Filter, - callback: LogHandler): + onLog: LogHandler): Future[Subscription] {.async.} = - proc handler(id, arguments: JsonNode) {.async.} = - if log =? Log.fromJson(arguments["result"]).catch: - callback(log) - return await provider.subscribe("logs", filter.some, handler) + convertError: + let subscriptions = await provider.subscriptions + return await subscriptions.subscribeLogs(filter, onLog) method subscribe*(provider: JsonRpcProvider, - callback: BlockHandler): + onBlock: BlockHandler): Future[Subscription] {.async.} = - proc handler(id, arguments: JsonNode) {.async.} = - if blck =? Block.fromJson(arguments["result"]).catch: - await callback(blck) - return await provider.subscribe("newHeads", Filter.none, handler) - -method unsubscribe*(subscription: JsonRpcSubscription) {.async.} = convertError: - let provider = subscription.provider - provider.subscriptions.del(subscription.id) - let client = await provider.client - discard await client.eth_unsubscribe(subscription.id) + let subscriptions = await provider.subscriptions + return await subscriptions.subscribeBlocks(onBlock) # Signer