feat(subscriptions): use websocket to get instant updates

This commit is contained in:
Mark Spanbroek 2025-09-10 11:29:19 +02:00
parent 248965f7f6
commit 86b9a02054
3 changed files with 39 additions and 2 deletions

View File

@ -12,6 +12,7 @@ import ../signer
import ./jsonrpc/rpccalls
import ./jsonrpc/conversions
import ./jsonrpc/errors
import ./jsonrpc/websocket
export basics
export provider
@ -49,6 +50,7 @@ proc connect*(
await websocket.connect(url)
provider.client = websocket
provider.subscriptions = Subscriptions.new(provider, pollingInterval)
await provider.subscriptions.useWebsocketUpdates(websocket)
else:
let http = newRpcHttpClient(getHeaders = jsonHeaders)
await http.connect(url)

View File

@ -0,0 +1,34 @@
import std/json
import pkg/json_rpc/rpcclient
import ../../basics
import ../../subscriptions
import ./rpccalls
import ./errors
proc useWebsocketUpdates*(
subscriptions: Subscriptions,
websocket: RpcWebSocketClient
) {.async:(raises:[JsonRpcProviderError, CancelledError]).} =
var rpcSubscriptionId: JsonNode
proc processMessage(client: RpcClient, message: string): Result[bool, string] =
without message =? parseJson(message).catch:
return ok true
without rpcMethod =? message{"method"}:
return ok true
if rpcMethod.getStr() != "eth_subscription":
return ok true
without rpcParameter =? message{"params"}{"subscription"}:
return ok true
if rpcParameter != rpcSubscriptionId:
return ok true
subscriptions.update()
ok false # do not process further using json-rpc default handler
assert websocket.onProcessMessage.isNil
websocket.onProcessMessage = processMessage
convertError:
rpcSubscriptionId = await websocket.eth_subscribe("newHeads")

View File

@ -22,7 +22,8 @@ func new*(
BlockSubscriber(
provider: provider,
processor: processor,
pollingInterval: pollingInterval
pollingInterval: pollingInterval,
wake: newAsyncEvent()
)
proc sleep(subscriber: BlockSubscriber) {.async:(raises:[CancelledError]).} =
@ -51,7 +52,7 @@ proc start*(
if subscriber.looping.isNil:
subscriber.lastSeen = await subscriber.provider.getBlockNumber()
subscriber.lastProcessed = subscriber.lastSeen
subscriber.wake = newAsyncEvent()
subscriber.wake.clear()
subscriber.looping = subscriber.loop()
proc stop*(subscriber: BlockSubscriber) {.async:(raises:[]).} =