diff --git a/ethers/providers/jsonrpc.nim b/ethers/providers/jsonrpc.nim index 004c931..6f2f538 100644 --- a/ethers/providers/jsonrpc.nim +++ b/ethers/providers/jsonrpc.nim @@ -12,6 +12,7 @@ import ../signer import ./jsonrpc/rpccalls import ./jsonrpc/conversions import ./jsonrpc/errors +import ./jsonrpc/websocket export basics export provider @@ -49,6 +50,7 @@ proc connect*( await websocket.connect(url) provider.client = websocket provider.subscriptions = Subscriptions.new(provider, pollingInterval) + await provider.subscriptions.useWebsocketUpdates(websocket) else: let http = newRpcHttpClient(getHeaders = jsonHeaders) await http.connect(url) diff --git a/ethers/providers/jsonrpc/websocket.nim b/ethers/providers/jsonrpc/websocket.nim new file mode 100644 index 0000000..cbf4299 --- /dev/null +++ b/ethers/providers/jsonrpc/websocket.nim @@ -0,0 +1,34 @@ +import std/json +import pkg/json_rpc/rpcclient +import ../../basics +import ../../subscriptions +import ./rpccalls +import ./errors + +proc useWebsocketUpdates*( + subscriptions: Subscriptions, + websocket: RpcWebSocketClient +) {.async:(raises:[JsonRpcProviderError, CancelledError]).} = + var rpcSubscriptionId: JsonNode + + proc processMessage(client: RpcClient, message: string): Result[bool, string] = + without message =? parseJson(message).catch: + return ok true + without rpcMethod =? message{"method"}: + return ok true + if rpcMethod.getStr() != "eth_subscription": + return ok true + without rpcParameter =? message{"params"}{"subscription"}: + return ok true + if rpcParameter != rpcSubscriptionId: + return ok true + + subscriptions.update() + + ok false # do not process further using json-rpc default handler + + assert websocket.onProcessMessage.isNil + websocket.onProcessMessage = processMessage + + convertError: + rpcSubscriptionId = await websocket.eth_subscribe("newHeads") diff --git a/ethers/subscriptions/blocksubscriber.nim b/ethers/subscriptions/blocksubscriber.nim index 305cd93..f1e6cc3 100644 --- a/ethers/subscriptions/blocksubscriber.nim +++ b/ethers/subscriptions/blocksubscriber.nim @@ -22,7 +22,8 @@ func new*( BlockSubscriber( provider: provider, processor: processor, - pollingInterval: pollingInterval + pollingInterval: pollingInterval, + wake: newAsyncEvent() ) proc sleep(subscriber: BlockSubscriber) {.async:(raises:[CancelledError]).} = @@ -51,7 +52,7 @@ proc start*( if subscriber.looping.isNil: subscriber.lastSeen = await subscriber.provider.getBlockNumber() subscriber.lastProcessed = subscriber.lastSeen - subscriber.wake = newAsyncEvent() + subscriber.wake.clear() subscriber.looping = subscriber.loop() proc stop*(subscriber: BlockSubscriber) {.async:(raises:[]).} =