Move JsonRpcSubscription type to jsonrpc module

Allows it to insert convertError to ensure that
any errors are re-raised as JsonRpcProviderError
This commit is contained in:
Mark Spanbroek 2023-06-28 11:02:21 +02:00 committed by markspanbroek
parent 738c6a87e2
commit 82f6449374
3 changed files with 34 additions and 30 deletions

View File

@ -24,7 +24,9 @@ type
provider: JsonRpcProvider
address: ?Address
JsonRpcProviderError* = object of ProviderError
SubscriptionHandler = proc(id, arguments: JsonNode): Future[void] {.gcsafe, upraises:[].}
JsonRpcSubscription* = ref object of Subscription
subscriptions: JsonRpcSubscriptions
id: JsonNode
proc raiseProviderError(message: string) {.upraises: [JsonRpcProviderError].} =
var message = message
@ -74,12 +76,14 @@ proc new*(_: type JsonRpcProvider,
pollingInterval = pollingInterval)
proc awaitClient: Future[RpcClient] {.async.} =
await initialized
return client
convertError:
await initialized
return client
proc awaitSubscriptions: Future[JsonRpcSubscriptions] {.async.} =
await initialized
return subscriptions
convertError:
await initialized
return subscriptions
initialized = initialize()
JsonRpcProvider(client: awaitClient(), subscriptions: awaitSubscriptions())
@ -168,14 +172,22 @@ method subscribe*(provider: JsonRpcProvider,
Future[Subscription] {.async.} =
convertError:
let subscriptions = await provider.subscriptions
return await subscriptions.subscribeLogs(filter, onLog)
let id = await subscriptions.subscribeLogs(filter, onLog)
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
method subscribe*(provider: JsonRpcProvider,
onBlock: BlockHandler):
Future[Subscription] {.async.} =
convertError:
let subscriptions = await provider.subscriptions
return await subscriptions.subscribeBlocks(onBlock)
let id = await subscriptions.subscribeBlocks(onBlock)
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
method unsubscribe(subscription: JsonRpcSubscription) {.async.} =
convertError:
let subscriptions = subscription.subscriptions
let id = subscription.id
await subscriptions.unsubscribe(id)
method close*(provider: JsonRpcProvider) {.async.} =
convertError:

View File

@ -12,27 +12,24 @@ type
JsonRpcSubscriptions* = ref object of RootObj
client: RpcClient
callbacks: Table[JsonNode, SubscriptionCallback]
JsonRpcSubscription = ref object of Subscription
subscriptions: JsonRpcSubscriptions
id: JsonNode
SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, upraises:[].}
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
onBlock: BlockHandler):
Future[JsonRpcSubscription]
Future[JsonNode]
{.async, base.} =
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
filter: Filter,
onLog: LogHandler):
Future[JsonRpcSubscription]
Future[JsonNode]
{.async, base.} =
raiseAssert "not implemented"
method unsubscribe(subscriptions: JsonRpcSubscriptions,
id: JsonNode)
{.async, base.} =
method unsubscribe*(subscriptions: JsonRpcSubscriptions,
id: JsonNode)
{.async, base.} =
raiseAssert "not implemented"
method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
@ -40,11 +37,6 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
for id in ids:
await subscriptions.unsubscribe(id)
method unsubscribe(subscription: JsonRpcSubscription) {.async.} =
let subscriptions = subscription.subscriptions
let id = subscription.id
await subscriptions.unsubscribe(id)
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback =
try:
@ -72,26 +64,26 @@ proc new*(_: type JsonRpcSubscriptions,
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonRpcSubscription]
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) =
if blck =? Block.fromJson(arguments["result"]).catch:
asyncSpawn onBlock(blck)
let id = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[id] = callback
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
return id
method subscribeLogs(subscriptions: WebSocketSubscriptions,
filter: Filter,
onLog: LogHandler):
Future[JsonRpcSubscription]
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) =
if log =? Log.fromJson(arguments["result"]).catch:
onLog(log)
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
return id
method unsubscribe(subscriptions: WebSocketSubscriptions,
id: JsonNode)
@ -137,7 +129,7 @@ method close*(subscriptions: PollingSubscriptions) {.async.} =
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonRpcSubscription]
Future[JsonNode]
{.async.} =
proc getBlock(hash: BlockHash) {.async.} =
@ -153,12 +145,12 @@ method subscribeBlocks(subscriptions: PollingSubscriptions,
let id = await subscriptions.client.eth_newBlockFilter()
subscriptions.callbacks[id] = callback
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
return id
method subscribeLogs(subscriptions: PollingSubscriptions,
filter: Filter,
onLog: LogHandler):
Future[JsonRpcSubscription]
Future[JsonNode]
{.async.} =
proc callback(id, change: JsonNode) =
@ -167,7 +159,7 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
let id = await subscriptions.client.eth_newFilter(filter)
subscriptions.callbacks[id] = callback
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
return id
method unsubscribe(subscriptions: PollingSubscriptions,
id: JsonNode)

View File

@ -27,7 +27,7 @@ template subscriptionTests(subscriptions, client) =
check eventually latestBlock.number.isSome
check latestBlock.hash.isSome
check latestBlock.timestamp > 0.u256
await subscription.unsubscribe()
await subscriptions.unsubscribe(subscription)
test "stops listening to new blocks when unsubscribed":
var count = 0
@ -36,7 +36,7 @@ template subscriptionTests(subscriptions, client) =
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
await subscription.unsubscribe()
await subscriptions.unsubscribe(subscription)
count = 0
discard await client.call("evm_mine", newJArray())
await sleepAsync(100.millis)