diff --git a/ethers/providers/jsonrpc.nim b/ethers/providers/jsonrpc.nim index 406965a..cbd1f52 100644 --- a/ethers/providers/jsonrpc.nim +++ b/ethers/providers/jsonrpc.nim @@ -24,7 +24,9 @@ type provider: JsonRpcProvider address: ?Address JsonRpcProviderError* = object of ProviderError - SubscriptionHandler = proc(id, arguments: JsonNode): Future[void] {.gcsafe, upraises:[].} + JsonRpcSubscription* = ref object of Subscription + subscriptions: JsonRpcSubscriptions + id: JsonNode proc raiseProviderError(message: string) {.upraises: [JsonRpcProviderError].} = var message = message @@ -74,12 +76,14 @@ proc new*(_: type JsonRpcProvider, pollingInterval = pollingInterval) proc awaitClient: Future[RpcClient] {.async.} = - await initialized - return client + convertError: + await initialized + return client proc awaitSubscriptions: Future[JsonRpcSubscriptions] {.async.} = - await initialized - return subscriptions + convertError: + await initialized + return subscriptions initialized = initialize() JsonRpcProvider(client: awaitClient(), subscriptions: awaitSubscriptions()) @@ -168,14 +172,22 @@ method subscribe*(provider: JsonRpcProvider, Future[Subscription] {.async.} = convertError: let subscriptions = await provider.subscriptions - return await subscriptions.subscribeLogs(filter, onLog) + let id = await subscriptions.subscribeLogs(filter, onLog) + return JsonRpcSubscription(subscriptions: subscriptions, id: id) method subscribe*(provider: JsonRpcProvider, onBlock: BlockHandler): Future[Subscription] {.async.} = convertError: let subscriptions = await provider.subscriptions - return await subscriptions.subscribeBlocks(onBlock) + let id = await subscriptions.subscribeBlocks(onBlock) + return JsonRpcSubscription(subscriptions: subscriptions, id: id) + +method unsubscribe(subscription: JsonRpcSubscription) {.async.} = + convertError: + let subscriptions = subscription.subscriptions + let id = subscription.id + await subscriptions.unsubscribe(id) method close*(provider: JsonRpcProvider) {.async.} = convertError: diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index f74e3e3..8d0d6d6 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -12,27 +12,24 @@ type JsonRpcSubscriptions* = ref object of RootObj client: RpcClient callbacks: Table[JsonNode, SubscriptionCallback] - JsonRpcSubscription = ref object of Subscription - subscriptions: JsonRpcSubscriptions - id: JsonNode SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, upraises:[].} method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, onBlock: BlockHandler): - Future[JsonRpcSubscription] + Future[JsonNode] {.async, base.} = raiseAssert "not implemented" method subscribeLogs*(subscriptions: JsonRpcSubscriptions, filter: Filter, onLog: LogHandler): - Future[JsonRpcSubscription] + Future[JsonNode] {.async, base.} = raiseAssert "not implemented" -method unsubscribe(subscriptions: JsonRpcSubscriptions, - id: JsonNode) - {.async, base.} = +method unsubscribe*(subscriptions: JsonRpcSubscriptions, + id: JsonNode) + {.async, base.} = raiseAssert "not implemented" method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} = @@ -40,11 +37,6 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} = for id in ids: await subscriptions.unsubscribe(id) -method unsubscribe(subscription: JsonRpcSubscription) {.async.} = - let subscriptions = subscription.subscriptions - let id = subscription.id - await subscriptions.unsubscribe(id) - proc getCallback(subscriptions: JsonRpcSubscriptions, id: JsonNode): ?SubscriptionCallback = try: @@ -72,26 +64,26 @@ proc new*(_: type JsonRpcSubscriptions, method subscribeBlocks(subscriptions: WebSocketSubscriptions, onBlock: BlockHandler): - Future[JsonRpcSubscription] + Future[JsonNode] {.async.} = proc callback(id, arguments: JsonNode) = if blck =? Block.fromJson(arguments["result"]).catch: asyncSpawn onBlock(blck) let id = await subscriptions.client.eth_subscribe("newHeads") subscriptions.callbacks[id] = callback - return JsonRpcSubscription(subscriptions: subscriptions, id: id) + return id method subscribeLogs(subscriptions: WebSocketSubscriptions, filter: Filter, onLog: LogHandler): - Future[JsonRpcSubscription] + Future[JsonNode] {.async.} = proc callback(id, arguments: JsonNode) = if log =? Log.fromJson(arguments["result"]).catch: onLog(log) let id = await subscriptions.client.eth_subscribe("logs", filter) subscriptions.callbacks[id] = callback - return JsonRpcSubscription(subscriptions: subscriptions, id: id) + return id method unsubscribe(subscriptions: WebSocketSubscriptions, id: JsonNode) @@ -137,7 +129,7 @@ method close*(subscriptions: PollingSubscriptions) {.async.} = method subscribeBlocks(subscriptions: PollingSubscriptions, onBlock: BlockHandler): - Future[JsonRpcSubscription] + Future[JsonNode] {.async.} = proc getBlock(hash: BlockHash) {.async.} = @@ -153,12 +145,12 @@ method subscribeBlocks(subscriptions: PollingSubscriptions, let id = await subscriptions.client.eth_newBlockFilter() subscriptions.callbacks[id] = callback - return JsonRpcSubscription(subscriptions: subscriptions, id: id) + return id method subscribeLogs(subscriptions: PollingSubscriptions, filter: Filter, onLog: LogHandler): - Future[JsonRpcSubscription] + Future[JsonNode] {.async.} = proc callback(id, change: JsonNode) = @@ -167,7 +159,7 @@ method subscribeLogs(subscriptions: PollingSubscriptions, let id = await subscriptions.client.eth_newFilter(filter) subscriptions.callbacks[id] = callback - return JsonRpcSubscription(subscriptions: subscriptions, id: id) + return id method unsubscribe(subscriptions: PollingSubscriptions, id: JsonNode) diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index 2a053f4..2540556 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -27,7 +27,7 @@ template subscriptionTests(subscriptions, client) = check eventually latestBlock.number.isSome check latestBlock.hash.isSome check latestBlock.timestamp > 0.u256 - await subscription.unsubscribe() + await subscriptions.unsubscribe(subscription) test "stops listening to new blocks when unsubscribed": var count = 0 @@ -36,7 +36,7 @@ template subscriptionTests(subscriptions, client) = let subscription = await subscriptions.subscribeBlocks(callback) discard await client.call("evm_mine", newJArray()) check eventually count > 0 - await subscription.unsubscribe() + await subscriptions.unsubscribe(subscription) count = 0 discard await client.call("evm_mine", newJArray()) await sleepAsync(100.millis)