Use new subscription handling in JSON RPC provider

This commit is contained in:
Mark Spanbroek 2023-06-27 11:47:59 +02:00 committed by markspanbroek
parent a7dc0ac9eb
commit 16fa0cfcf8
1 changed files with 36 additions and 76 deletions

View File

@ -8,6 +8,7 @@ import ../provider
import ../signer import ../signer
import ./jsonrpc/rpccalls import ./jsonrpc/rpccalls
import ./jsonrpc/conversions import ./jsonrpc/conversions
import ./jsonrpc/subscriptions
export json export json
export basics export basics
@ -18,10 +19,7 @@ push: {.upraises: [].}
type type
JsonRpcProvider* = ref object of Provider JsonRpcProvider* = ref object of Provider
client: Future[RpcClient] client: Future[RpcClient]
subscriptions: Table[JsonNode, SubscriptionHandler] subscriptions: Future[JsonRpcSubscriptions]
JsonRpcSubscription = ref object of Subscription
provider: JsonRpcProvider
id: JsonNode
JsonRpcSigner* = ref object of Signer JsonRpcSigner* = ref object of Signer
provider: JsonRpcProvider provider: JsonRpcProvider
address: ?Address address: ?Address
@ -53,45 +51,34 @@ const defaultUrl = "http://localhost:8545"
proc jsonHeaders: seq[(string, string)] = proc jsonHeaders: seq[(string, string)] =
@[("Content-Type", "application/json")] @[("Content-Type", "application/json")]
proc connect(_: type RpcClient, url: string): Future[RpcClient] {.async.} =
case parseUri(url).scheme
of "ws", "wss":
let client = newRpcWebSocketClient(getHeaders = jsonHeaders)
await client.connect(url)
return client
else:
let client = newRpcHttpClient(getHeaders = jsonHeaders)
await client.connect(url)
return client
proc connect(provider: JsonRpcProvider, url: string) =
proc getSubscriptionHandler(id: JsonNode): ?SubscriptionHandler =
try:
if provider.subscriptions.hasKey(id):
provider.subscriptions[id].some
else:
SubscriptionHandler.none
except Exception:
SubscriptionHandler.none
proc handleSubscription(arguments: JsonNode) {.upraises: [].} =
if id =? arguments["subscription"].catch and
handler =? getSubscriptionHandler(id):
# fire and forget
discard handler(id, arguments)
proc subscribe: Future[RpcClient] {.async.} =
let client = await RpcClient.connect(url)
client.setMethodHandler("eth_subscription", handleSubscription)
return client
provider.client = subscribe()
proc new*(_: type JsonRpcProvider, url=defaultUrl): JsonRpcProvider = proc new*(_: type JsonRpcProvider, url=defaultUrl): JsonRpcProvider =
let provider = JsonRpcProvider() var initialized: Future[void]
provider.connect(url) var client: RpcClient
provider var subscriptions: JsonRpcSubscriptions
proc initialize {.async.} =
case parseUri(url).scheme
of "ws", "wss":
let websocket = newRpcWebSocketClient(getHeaders = jsonHeaders)
await websocket.connect(url)
client = websocket
subscriptions = JsonRpcSubscriptions.new(websocket)
else:
let http = newRpcHttpClient(getHeaders = jsonHeaders)
await http.connect(url)
client = http
subscriptions = JsonRpcSubscriptions.new(http)
proc awaitClient: Future[RpcClient] {.async.} =
await initialized
return client
proc awaitSubscriptions: Future[JsonRpcSubscriptions] {.async.} =
await initialized
return subscriptions
initialized = initialize()
JsonRpcProvider(client: awaitClient(), subscriptions: awaitSubscriptions())
proc send*(provider: JsonRpcProvider, proc send*(provider: JsonRpcProvider,
call: string, call: string,
@ -171,47 +158,20 @@ method sendTransaction*(provider: JsonRpcProvider, rawTransaction: seq[byte]): F
return TransactionResponse(hash: hash, provider: provider) return TransactionResponse(hash: hash, provider: provider)
proc subscribe(provider: JsonRpcProvider,
name: string,
filter: ?Filter,
handler: SubscriptionHandler): Future[Subscription] {.async.} =
convertError:
let client = await provider.client
doAssert client of RpcWebSocketClient, "subscriptions require websockets"
var id: JsonNode
if filter =? filter:
id = await client.eth_subscribe(name, filter)
else:
id = await client.eth_subscribe(name)
provider.subscriptions[id] = handler
return JsonRpcSubscription(id: id, provider: provider)
method subscribe*(provider: JsonRpcProvider, method subscribe*(provider: JsonRpcProvider,
filter: Filter, filter: Filter,
callback: LogHandler): onLog: LogHandler):
Future[Subscription] {.async.} = Future[Subscription] {.async.} =
proc handler(id, arguments: JsonNode) {.async.} = convertError:
if log =? Log.fromJson(arguments["result"]).catch: let subscriptions = await provider.subscriptions
callback(log) return await subscriptions.subscribeLogs(filter, onLog)
return await provider.subscribe("logs", filter.some, handler)
method subscribe*(provider: JsonRpcProvider, method subscribe*(provider: JsonRpcProvider,
callback: BlockHandler): onBlock: BlockHandler):
Future[Subscription] {.async.} = Future[Subscription] {.async.} =
proc handler(id, arguments: JsonNode) {.async.} =
if blck =? Block.fromJson(arguments["result"]).catch:
await callback(blck)
return await provider.subscribe("newHeads", Filter.none, handler)
method unsubscribe*(subscription: JsonRpcSubscription) {.async.} =
convertError: convertError:
let provider = subscription.provider let subscriptions = await provider.subscriptions
provider.subscriptions.del(subscription.id) return await subscriptions.subscribeBlocks(onBlock)
let client = await provider.client
discard await client.eth_unsubscribe(subscription.id)
# Signer # Signer