refactor!(subscriptions): replace old implementation

This commit is contained in:
Mark Spanbroek 2025-09-09 17:25:17 +02:00
parent 017245826f
commit ff667cb8f0
19 changed files with 131 additions and 855 deletions

View File

@ -33,7 +33,7 @@ JSON-RPC provider is supported:
import ethers
import chronos
let provider = JsonRpcProvider.new("ws://localhost:8545")
let provider = await JsonRpcProvider.connect("ws://localhost:8545")
let accounts = await provider.listAccounts()
```
@ -204,13 +204,6 @@ This library ships with some optional modules that provides convenience utilitie
- `ethers/erc20` module provides you with ERC20 token implementation and its events
Hardhat websockets workaround
---------
If you're working with Hardhat, you might encounter an issue where [websocket subscriptions stop working after 5 minutes](https://github.com/NomicFoundation/hardhat/issues/2053).
This library provides a workaround using the compile time `ws_resubscribe` symbol. When this symbol is defined and set to a value greater than 0, websocket subscriptions will automatically resubscribe after the amount of time (in seconds) specified. The recommended value is 240 seconds (4 minutes), eg `--define:ws_resubscribe=240`.
Contribution
------------

View File

@ -4,7 +4,6 @@ type
SolidityError* = object of EthersError
ContractError* = object of EthersError
SignerError* = object of EthersError
SubscriptionError* = object of EthersError
ProviderError* = object of EthersError
data*: ?seq[byte]

View File

@ -225,7 +225,7 @@ proc confirm*(
tx: TransactionResponse,
confirmations = EthersDefaultConfirmations,
timeout = EthersReceiptTimeoutBlks): Future[TransactionReceipt]
{.async: (raises: [CancelledError, ProviderError, SubscriptionError, EthersError]).} =
{.async: (raises: [CancelledError, ProviderError, EthersError]).} =
## Waits for a transaction to be mined and for the specified number of blocks
## to pass since it was mined (confirmations). The number of confirmations

View File

@ -7,40 +7,26 @@ import pkg/json_rpc/errors
import pkg/serde
import ../basics
import ../provider
import ../subscriptions
import ../signer
import ./jsonrpc/rpccalls
import ./jsonrpc/conversions
import ./jsonrpc/subscriptions
import ./jsonrpc/errors
export basics
export provider
export chronicles
export errors.JsonRpcProviderError
export subscriptions
{.push raises: [].}
logScope:
topics = "ethers jsonrpc"
type
JsonRpcProvider* = ref object of Provider
client: Future[RpcClient]
subscriptions: Future[JsonRpcSubscriptions]
maxPriorityFeePerGas: UInt256
JsonRpcSubscription* = ref object of Subscription
subscriptions: JsonRpcSubscriptions
id: JsonNode
# Signer
JsonRpcSigner* = ref object of Signer
provider: JsonRpcProvider
address: ?Address
JsonRpcSignerError* = object of SignerError
# Provider
type JsonRpcProvider* = ref object of Provider
client: RpcClient
subscriptions: Subscriptions
maxPriorityFeePerGas: UInt256
const defaultUrl = "http://localhost:8545"
const defaultPollingInterval = 4.seconds
@ -49,55 +35,26 @@ const defaultMaxPriorityFeePerGas = 1_000_000_000.u256
proc jsonHeaders: seq[(string, string)] =
@[("Content-Type", "application/json")]
proc new*(
proc connect*(
_: type JsonRpcProvider,
url=defaultUrl,
pollingInterval=defaultPollingInterval,
maxPriorityFeePerGas=defaultMaxPriorityFeePerGas
): JsonRpcProvider {.raises: [].} =
var initialized: Future[void]
var client: RpcClient
var subscriptions: JsonRpcSubscriptions
proc initialize() {.async: (raises: [JsonRpcProviderError, CancelledError]).} =
convertError:
case parseUri(url).scheme
of "ws", "wss":
let websocket = newRpcWebSocketClient(getHeaders = jsonHeaders)
await websocket.connect(url)
client = websocket
subscriptions = JsonRpcSubscriptions.new(websocket)
else:
let http = newRpcHttpClient(getHeaders = jsonHeaders)
await http.connect(url)
client = http
subscriptions = JsonRpcSubscriptions.new(
http,
pollingInterval = pollingInterval
)
subscriptions.start()
proc awaitClient(): Future[RpcClient] {.
async: (raises: [JsonRpcProviderError, CancelledError])
.} =
convertError:
await initialized
return client
proc awaitSubscriptions(): Future[JsonRpcSubscriptions] {.
async: (raises: [JsonRpcProviderError, CancelledError])
.} =
convertError:
await initialized
return subscriptions
initialized = initialize()
return JsonRpcProvider(
client: awaitClient(),
subscriptions: awaitSubscriptions(),
maxPriorityFeePerGas: maxPriorityFeePerGas
)
): Future[JsonRpcProvider] {.async:(raises: [JsonRpcProviderError, CancelledError]).} =
convertError:
let provider = JsonRpcProvider(maxPriorityFeePerGas: maxPriorityFeePerGas)
case parseUri(url).scheme
of "ws", "wss":
let websocket = newRpcWebSocketClient(getHeaders = jsonHeaders)
await websocket.connect(url)
provider.client = websocket
provider.subscriptions = Subscriptions.new(provider, pollingInterval)
else:
let http = newRpcHttpClient(getHeaders = jsonHeaders)
await http.connect(url)
provider.client = http
provider.subscriptions = Subscriptions.new(provider, pollingInterval)
return provider
proc callImpl(
client: RpcClient, call: string, args: JsonNode
@ -118,57 +75,44 @@ proc send*(
provider: JsonRpcProvider, call: string, arguments: seq[JsonNode] = @[]
): Future[JsonNode] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.callImpl(call, %arguments)
return await provider.client.callImpl(call, %arguments)
proc listAccounts*(
provider: JsonRpcProvider
): Future[seq[Address]] {.async: (raises: [JsonRpcProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_accounts()
proc getSigner*(provider: JsonRpcProvider): JsonRpcSigner =
JsonRpcSigner(provider: provider)
proc getSigner*(provider: JsonRpcProvider, address: Address): JsonRpcSigner =
JsonRpcSigner(provider: provider, address: some address)
return await provider.client.eth_accounts()
method getBlockNumber*(
provider: JsonRpcProvider
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_blockNumber()
return await provider.client.eth_blockNumber()
method getBlock*(
provider: JsonRpcProvider, tag: BlockTag
): Future[?Block] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_getBlockByNumber(tag, false)
return await provider.client.eth_getBlockByNumber(tag, false)
method call*(
provider: JsonRpcProvider, tx: Transaction, blockTag = BlockTag.latest
): Future[seq[byte]] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_call(tx, blockTag)
return await provider.client.eth_call(tx, blockTag)
method getGasPrice*(
provider: JsonRpcProvider
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_gasPrice()
return await provider.client.eth_gasPrice()
method getMaxPriorityFeePerGas*(
provider: JsonRpcProvider
): Future[UInt256] {.async: (raises: [CancelledError]).} =
try:
convertError:
let client = await provider.client
return await client.eth_maxPriorityFeePerGas()
return await provider.client.eth_maxPriorityFeePerGas()
except JsonRpcProviderError:
# If the provider does not provide the implementation
# let's just remove the manual value
@ -178,35 +122,31 @@ method getTransactionCount*(
provider: JsonRpcProvider, address: Address, blockTag = BlockTag.latest
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_getTransactionCount(address, blockTag)
return await provider.client.eth_getTransactionCount(address, blockTag)
method getTransaction*(
provider: JsonRpcProvider, txHash: TransactionHash
): Future[?PastTransaction] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_getTransactionByHash(txHash)
return await provider.client.eth_getTransactionByHash(txHash)
method getTransactionReceipt*(
provider: JsonRpcProvider, txHash: TransactionHash
): Future[?TransactionReceipt] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
return await client.eth_getTransactionReceipt(txHash)
return await provider.client.eth_getTransactionReceipt(txHash)
method getLogs*(
provider: JsonRpcProvider, filter: EventFilter
): Future[seq[Log]] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
let logsJson =
if filter of Filter:
await client.eth_getLogs(Filter(filter))
await provider.client.eth_getLogs(Filter(filter))
elif filter of FilterByBlockHash:
await client.eth_getLogs(FilterByBlockHash(filter))
await provider.client.eth_getLogs(FilterByBlockHash(filter))
else:
await client.eth_getLogs(filter)
await provider.client.eth_getLogs(filter)
var logs: seq[Log] = @[]
for logJson in logsJson.getElems:
@ -222,8 +162,7 @@ method estimateGas*(
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
try:
convertError:
let client = await provider.client
return await client.eth_estimateGas(transaction, blockTag)
return await provider.client.eth_estimateGas(transaction, blockTag)
except ProviderError as error:
raise (ref EstimateGasError)(
msg: "Estimate gas failed: " & error.msg,
@ -236,47 +175,29 @@ method getChainId*(
provider: JsonRpcProvider
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
try:
return await client.eth_chainId()
return await provider.client.eth_chainId()
except CancelledError as error:
raise error
except CatchableError:
return parse(await client.net_version(), UInt256)
return parse(await provider.client.net_version(), UInt256)
method sendTransaction*(
provider: JsonRpcProvider, rawTransaction: seq[byte]
): Future[TransactionResponse] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let
client = await provider.client
hash = await client.eth_sendRawTransaction(rawTransaction)
let hash = await provider.client.eth_sendRawTransaction(rawTransaction)
return TransactionResponse(hash: hash, provider: provider)
method subscribe*(
provider: JsonRpcProvider, filter: EventFilter, onLog: LogHandler
): Future[Subscription] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let subscriptions = await provider.subscriptions
let id = await subscriptions.subscribeLogs(filter, onLog)
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
await provider.subscriptions.subscribe(filter, onLog)
method subscribe*(
provider: JsonRpcProvider, onBlock: BlockHandler
): Future[Subscription] {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let subscriptions = await provider.subscriptions
let id = await subscriptions.subscribeBlocks(onBlock)
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
method unsubscribe*(
subscription: JsonRpcSubscription
) {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let subscriptions = subscription.subscriptions
let id = subscription.id
await subscriptions.unsubscribe(id)
await provider.subscriptions.subscribe(onBlock)
method isSyncing*(
provider: JsonRpcProvider
@ -290,12 +211,14 @@ method close*(
provider: JsonRpcProvider
) {.async: (raises: [ProviderError, CancelledError]).} =
convertError:
let client = await provider.client
let subscriptions = await provider.subscriptions
await subscriptions.close()
await client.close()
await provider.subscriptions.close()
await provider.client.close()
# Signer
type
JsonRpcSigner* = ref object of Signer
provider: JsonRpcProvider
address: ?Address
JsonRpcSignerError* = object of SignerError
proc raiseJsonRpcSignerError(
message: string) {.raises: [JsonRpcSignerError].} =
@ -316,6 +239,12 @@ template convertSignerError(body) =
except CatchableError as error:
raise newException(JsonRpcSignerError, error.msg)
proc getSigner*(provider: JsonRpcProvider): JsonRpcSigner =
JsonRpcSigner(provider: provider)
proc getSigner*(provider: JsonRpcProvider, address: Address): JsonRpcSigner =
JsonRpcSigner(provider: provider, address: some address)
method provider*(signer: JsonRpcSigner): Provider
{.gcsafe, raises: [SignerError].} =
@ -337,9 +266,8 @@ method signMessage*(
signer: JsonRpcSigner, message: seq[byte]
): Future[seq[byte]] {.async: (raises: [SignerError, CancelledError]).} =
convertSignerError:
let client = await signer.provider.client
let address = await signer.getAddress()
return await client.personal_sign(message, address)
return await signer.provider.client.personal_sign(message, address)
method sendTransaction*(
signer: JsonRpcSigner, transaction: Transaction
@ -347,8 +275,5 @@ method sendTransaction*(
async: (raises: [SignerError, ProviderError, CancelledError])
.} =
convertError:
let
client = await signer.provider.client
hash = await client.eth_sendTransaction(transaction)
let hash = await signer.provider.client.eth_sendTransaction(transaction)
return TransactionResponse(hash: hash, provider: signer.provider)

View File

@ -1,364 +0,0 @@
import std/tables
import std/sequtils
import std/strutils
import pkg/chronos
import pkg/questionable
import pkg/json_rpc/rpcclient
import pkg/serde
import ../../basics
import ../../errors
import ../../provider
include ../../nimshims/hashes
import ./rpccalls
import ./conversions
export serde
type
JsonRpcSubscriptions* = ref object of RootObj
client: RpcClient
callbacks: Table[JsonNode, SubscriptionCallback]
subscriptionHandler: MethodHandler
# Used by both PollingSubscriptions and WebsocketSubscriptions to store
# subscription filters so the subscriptions can be recreated. With
# PollingSubscriptions, the RPC node might prune/forget about them, and with
# WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5
# minutes.
logFilters: Table[JsonNode, EventFilter]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].}
{.push raises:[].}
template convertErrorsToSubscriptionError(body) =
try:
body
except CancelledError as error:
raise error
except CatchableError as error:
raise error.toErr(SubscriptionError)
template `or`(a: JsonNode, b: typed): JsonNode =
if a.isNil: b else: a
func start*(subscriptions: JsonRpcSubscriptions) =
subscriptions.client.onProcessMessage =
proc(client: RpcClient, line: string): Result[bool, string] =
if json =? JsonNode.fromJson(line):
if "method" in json and json{"method"}.getStr() == "eth_subscription":
if handler =? subscriptions.subscriptionHandler:
handler(json{"params"} or newJArray())
return ok false # do not process using json-rpc default handler
return ok true # continue processing using json-rpc default handler
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]), base,.} =
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
filter: EventFilter,
onLog: LogHandler):
Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]), base.} =
raiseAssert "not implemented"
method unsubscribe*(subscriptions: JsonRpcSubscriptions,
id: JsonNode)
{.async: (raises: [CancelledError]), base.} =
raiseAssert "not implemented "
method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: []), base.} =
let ids = toSeq subscriptions.callbacks.keys
for id in ids:
try:
await subscriptions.unsubscribe(id)
except CatchableError as e:
error "JsonRpc unsubscription failed", error = e.msg, id = id
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback {. raises:[].} =
try:
if not id.isNil and id in subscriptions.callbacks:
return subscriptions.callbacks[id].some
except: discard
# Web sockets
# Default re-subscription period is seconds
const WsResubscribe {.intdefine.}: int = 0
type
WebSocketSubscriptions = ref object of JsonRpcSubscriptions
logFiltersLock: AsyncLock
resubscribeFut: Future[void]
resubscribeInterval: int
template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) =
if subscriptions.logFiltersLock.isNil:
subscriptions.logFiltersLock = newAsyncLock()
await subscriptions.logFiltersLock.acquire()
try:
body
finally:
subscriptions.logFiltersLock.release()
# This is a workaround to manage the 5 minutes limit due to hardhat.
# See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064
proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebsocketSubscriptions) {.async: (raises: [CancelledError]).} =
while true:
await sleepAsync(subscriptions.resubscribeInterval.seconds)
try:
withLock(subscriptions):
for id, callback in subscriptions.callbacks:
var newId: JsonNode
if id in subscriptions.logFilters:
let filter = subscriptions.logFilters[id]
newId = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.logFilters[newId] = filter
subscriptions.logFilters.del(id)
else:
newId = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[newId] = callback
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
except CancelledError as e:
raise e
except CatchableError as e:
error "WS resubscription failed" , error = e.msg
proc new*(_: type JsonRpcSubscriptions,
client: RpcWebSocketClient,
resubscribeInterval = WsResubscribe): JsonRpcSubscriptions =
let subscriptions = WebSocketSubscriptions(client: client, resubscribeInterval: resubscribeInterval)
proc subscriptionHandler(arguments: JsonNode) {.raises:[].} =
let id = arguments{"subscription"} or newJString("")
if callback =? subscriptions.getCallback(id):
callback(id, success(arguments))
subscriptions.subscriptionHandler = subscriptionHandler
if resubscribeInterval > 0:
if resubscribeInterval >= 300:
warn "Resubscription interval greater than 300 seconds is useless for hardhat workaround", resubscribeInterval = resubscribeInterval
subscriptions.resubscribeFut = resubscribeWebsocketEventsOnTimeout(subscriptions)
subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]).} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} =
without arguments =? argumentsResult, error:
onBlock(failure(Block, error.toErr(SubscriptionError)))
return
let res = Block.fromJson(arguments{"result"}).mapFailure(SubscriptionError)
onBlock(res)
convertErrorsToSubscriptionError:
withLock(subscriptions):
let id = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[id] = callback
return id
method subscribeLogs(subscriptions: WebSocketSubscriptions,
filter: EventFilter,
onLog: LogHandler):
Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]).} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
let res = Log.fromJson(arguments{"result"}).mapFailure(SubscriptionError)
onLog(res)
convertErrorsToSubscriptionError:
withLock(subscriptions):
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
subscriptions.logFilters[id] = filter
return id
method unsubscribe*(subscriptions: WebSocketSubscriptions,
id: JsonNode)
{.async: (raises: [CancelledError]).} =
try:
withLock(subscriptions):
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
except CancelledError as e:
raise e
except CatchableError:
# Ignore if uninstallation of the subscribiton fails.
discard
method close*(subscriptions: WebSocketSubscriptions) {.async: (raises: []).} =
await procCall JsonRpcSubscriptions(subscriptions).close()
if not subscriptions.resubscribeFut.isNil:
await subscriptions.resubscribeFut.cancelAndWait()
# Polling
type
PollingSubscriptions* = ref object of JsonRpcSubscriptions
polling: Future[void]
# Used when filters are recreated to translate from the id that user
# originally got returned to new filter id
subscriptionMapping: Table[JsonNode, JsonNode]
proc new*(_: type JsonRpcSubscriptions,
client: RpcHttpClient,
pollingInterval = 4.seconds): JsonRpcSubscriptions =
let subscriptions = PollingSubscriptions(client: client)
proc resubscribe(id: JsonNode): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
var newId: JsonNode
# Log filters are stored in logFilters, block filters are not persisted
# there is they do not need any specific data for their recreation.
# We use this to determine if the filter was log or block filter here.
if subscriptions.logFilters.hasKey(id):
let filter = subscriptions.logFilters[id]
newId = await subscriptions.client.eth_newFilter(filter)
else:
newId = await subscriptions.client.eth_newBlockFilter()
subscriptions.subscriptionMapping[id] = newId
except CancelledError as e:
raise e
except CatchableError as e:
return failure(void, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg))
return success()
proc getChanges(id: JsonNode): Future[?!JsonNode] {.async: (raises: [CancelledError]).} =
if mappedId =? subscriptions.subscriptionMapping.?[id]:
try:
let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
if changes.kind == JArray:
return success(changes)
except JsonRpcError as e:
if error =? (await resubscribe(id)).errorOption:
return failure(JsonNode, error)
# TODO: we could still miss some events between losing the subscription
# and resubscribing. We should probably adopt a strategy like ethers.js,
# whereby we keep track of the latest block number that we've seen
# filter changes for:
# https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977
if not ("filter not found" in e.msg):
return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg))
except CancelledError as e:
raise e
except SubscriptionError as e:
return failure(JsonNode, e)
except CatchableError as e:
return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg))
return success(newJArray())
proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} =
without callback =? subscriptions.getCallback(id):
return
without changes =? (await getChanges(id)), error:
callback(id, failure(JsonNode, error))
return
for change in changes:
callback(id, success(change))
proc poll {.async: (raises: []).} =
try:
while true:
for id in toSeq subscriptions.callbacks.keys:
await poll(id)
await sleepAsync(pollingInterval)
except CancelledError:
discard
subscriptions.polling = poll()
asyncSpawn subscriptions.polling
subscriptions
method close*(subscriptions: PollingSubscriptions) {.async: (raises: []).} =
await subscriptions.polling.cancelAndWait()
await procCall JsonRpcSubscriptions(subscriptions).close()
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]).} =
proc getBlock(hash: BlockHash) {.async: (raises:[]).} =
try:
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
onBlock(success(blck))
except CancelledError:
discard
except CatchableError as e:
let error = e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg)
onBlock(failure(Block, error))
proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises:[].} =
without change =? changeResult, e:
onBlock(failure(Block, e.toErr(SubscriptionError)))
return
if hash =? BlockHash.fromJson(change):
asyncSpawn getBlock(hash)
convertErrorsToSubscriptionError:
let id = await subscriptions.client.eth_newBlockFilter()
subscriptions.callbacks[id] = callback
subscriptions.subscriptionMapping[id] = id
return id
method subscribeLogs(subscriptions: PollingSubscriptions,
filter: EventFilter,
onLog: LogHandler):
Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]).} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
let res = Log.fromJson(arguments).mapFailure(SubscriptionError)
onLog(res)
convertErrorsToSubscriptionError:
let id = await subscriptions.client.eth_newFilter(filter)
subscriptions.callbacks[id] = callback
subscriptions.logFilters[id] = filter
subscriptions.subscriptionMapping[id] = id
return id
method unsubscribe*(subscriptions: PollingSubscriptions,
id: JsonNode)
{.async: (raises: [CancelledError]).} =
try:
subscriptions.logFilters.del(id)
subscriptions.callbacks.del(id)
if sub =? subscriptions.subscriptionMapping.?[id]:
subscriptions.subscriptionMapping.del(id)
discard await subscriptions.client.eth_uninstallFilter(sub)
except CancelledError as e:
raise e
except CatchableError:
# Ignore if uninstallation of the filter fails. If it's the last step in our
# cleanup, then filter changes for this filter will no longer be polled so
# if the filter continues to live on in geth for whatever reason then it
# doesn't matter.
discard

View File

@ -1,56 +0,0 @@
import ../../examples
import ../../../ethers/provider
import ../../../ethers/providers/jsonrpc/conversions
import std/sequtils
import pkg/stew/byteutils
import pkg/json_rpc/rpcserver except `%`, `%*`
import pkg/json_rpc/errors
type MockRpcHttpServer* = ref object
filters*: seq[string]
nextGetChangesReturnsError*: bool
srv: RpcHttpServer
proc new*(_: type MockRpcHttpServer): MockRpcHttpServer =
let srv = newRpcHttpServer(["127.0.0.1:0"])
MockRpcHttpServer(filters: @[], srv: srv, nextGetChangesReturnsError: false)
proc invalidateFilter*(server: MockRpcHttpServer, jsonId: JsonNode) =
server.filters.keepItIf it != jsonId.getStr
proc start*(server: MockRpcHttpServer) =
server.srv.router.rpc("eth_newFilter") do(filter: EventFilter) -> string:
let filterId = "0x" & (array[16, byte].example).toHex
server.filters.add filterId
return filterId
server.srv.router.rpc("eth_newBlockFilter") do() -> string:
let filterId = "0x" & (array[16, byte].example).toHex
server.filters.add filterId
return filterId
server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]:
if server.nextGetChangesReturnsError:
raise (ref ApplicationError)(code: -32000, msg: "unknown error")
if id notin server.filters:
raise (ref ApplicationError)(code: -32000, msg: "filter not found")
return @[]
server.srv.router.rpc("eth_uninstallFilter") do(id: string) -> bool:
if id notin server.filters:
raise (ref ApplicationError)(code: -32000, msg: "filter not found")
server.invalidateFilter(%id)
return true
server.srv.start()
proc stop*(server: MockRpcHttpServer) {.async.} =
await server.srv.stop()
await server.srv.closeWait()
proc localAddress*(server: MockRpcHttpServer): seq[TransportAddress] =
return server.srv.localAddress()

View File

@ -15,14 +15,14 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
var provider: JsonRpcProvider
setup:
provider = JsonRpcProvider.new(url, pollingInterval = 100.millis)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
teardown:
await provider.close()
test "can be instantiated with a default URL":
discard JsonRpcProvider.new()
let provider = await JsonRpcProvider.connect()
await provider.close()
test "lists all accounts":
let accounts = await provider.listAccounts()
@ -87,20 +87,9 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
expect EthersError:
discard await confirming
test "raises JsonRpcProviderError when something goes wrong":
let provider = JsonRpcProvider.new("http://invalid.")
test "raises JsonRpcProviderError when connect fails":
expect JsonRpcProviderError:
discard await provider.listAccounts()
expect JsonRpcProviderError:
discard await provider.send("evm_mine")
expect JsonRpcProviderError:
discard await provider.getBlockNumber()
expect JsonRpcProviderError:
discard await provider.getBlock(BlockTag.latest)
expect JsonRpcProviderError:
discard await provider.subscribe(proc(_: ?!Block) = discard)
expect JsonRpcProviderError:
discard await provider.getSigner().sendTransaction(Transaction.example)
discard await JsonRpcProvider.connect("http://invalid.")
test "syncing":
let isSyncing = await provider.isSyncing()

View File

@ -8,10 +8,10 @@ suite "JsonRpcSigner":
var provider: JsonRpcProvider
var accounts: seq[Address]
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
setup:
provider = JsonRpcProvider.new("http://" & providerUrl)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
accounts = await provider.listAccounts()
teardown:
@ -75,7 +75,7 @@ suite "JsonRpcSigner":
let blk = !(await signer.provider.getBlock(BlockTag.latest))
transaction.maxFeePerGas = some(!blk.baseFeePerGas * 2.u256 + !populated.maxPriorityFeePerGas)
check populated == transaction
test "populate fails when sender does not match signer address":

View File

@ -1,218 +1,66 @@
import std/os
import std/importutils
import pkg/asynctest/chronos/unittest
import pkg/serde
import pkg/json_rpc/rpcclient
import pkg/json_rpc/rpcserver
import ethers/provider
import ethers/providers/jsonrpc/subscriptions
import ethers/providers/jsonrpc
import ../../examples
import ./rpc_mock
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
for url in ["ws://" & providerUrl, "http://" & providerUrl]:
suite "JsonRpcSubscriptions":
suite "JSON-RPC Subscriptions (" & url & ")":
test "can be instantiated with an http client":
let client = newRpcHttpClient()
let subscriptions = JsonRpcSubscriptions.new(client)
check not isNil subscriptions
var provider: JsonRpcProvider
test "can be instantiated with a websocket client":
let client = newRpcWebSocketClient()
let subscriptions = JsonRpcSubscriptions.new(client)
check not isNil subscriptions
setup:
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
template subscriptionTests(subscriptions, client) =
teardown:
await provider.close()
test "subscribes to new blocks":
var latestBlock: Block
proc callback(blck: ?!Block) =
latestBlock = blck.value
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually latestBlock.number.isSome
check latestBlock.hash.isSome
check latestBlock.timestamp > 0.u256
await subscriptions.unsubscribe(subscription)
test "subscribes to new blocks":
var latestBlock: Block
proc callback(blck: ?!Block) =
latestBlock = blck.value
let subscription = await provider.subscribe(callback)
discard await provider.send("evm_mine")
check eventually latestBlock.number.isSome
check latestBlock.hash.isSome
check latestBlock.timestamp > 0.u256
await subscription.unsubscribe()
test "stops listening to new blocks when unsubscribed":
var count = 0
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
await subscriptions.unsubscribe(subscription)
count = 0
discard await client.call("evm_mine", newJArray())
await sleepAsync(100.millis)
check count == 0
test "stops listening to new blocks when unsubscribed":
var count = 0
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
let subscription = await provider.subscribe(callback)
discard await provider.send("evm_mine")
check eventually count > 0
await subscription.unsubscribe()
count = 0
discard await provider.send("evm_mine")
await sleepAsync(200.millis)
check count == 0
test "unsubscribing from a non-existent subscription does not do any harm":
await subscriptions.unsubscribe(newJInt(0))
test "duplicate unsubscribe is harmless":
proc callback(blck: ?!Block) = discard
let subscription = await provider.subscribe(callback)
await subscription.unsubscribe()
await subscription.unsubscribe()
test "duplicate unsubscribe is harmless":
proc callback(blck: ?!Block) = discard
let subscription = await subscriptions.subscribeBlocks(callback)
await subscriptions.unsubscribe(subscription)
await subscriptions.unsubscribe(subscription)
test "stops listening to new blocks when provider is closed":
var count = 0
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
discard await provider.subscribe(callback)
discard await provider.send("evm_mine")
check eventually count > 0
await provider.close()
count = 0
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
discard await provider.send("evm_mine")
await sleepAsync(200.millis)
check count == 0
test "stops listening to new blocks when provider is closed":
var count = 0
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
discard await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
await subscriptions.close()
count = 0
discard await client.call("evm_mine", newJArray())
await sleepAsync(100.millis)
check count == 0
suite "Web socket subscriptions":
var subscriptions: JsonRpcSubscriptions
var client: RpcWebSocketClient
setup:
client = newRpcWebSocketClient()
await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545"))
subscriptions = JsonRpcSubscriptions.new(client)
subscriptions.start()
teardown:
await subscriptions.close()
await client.close()
subscriptionTests(subscriptions, client)
suite "HTTP polling subscriptions":
var subscriptions: JsonRpcSubscriptions
var client: RpcHttpClient
setup:
client = newRpcHttpClient()
await client.connect("http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545"))
subscriptions = JsonRpcSubscriptions.new(client,
pollingInterval = 100.millis)
subscriptions.start()
teardown:
await subscriptions.close()
await client.close()
subscriptionTests(subscriptions, client)
suite "HTTP polling subscriptions - mock tests":
var subscriptions: PollingSubscriptions
var client: RpcHttpClient
var mockServer: MockRpcHttpServer
privateAccess(PollingSubscriptions)
privateAccess(JsonRpcSubscriptions)
proc startServer() {.async.} =
mockServer = MockRpcHttpServer.new()
mockServer.start()
await client.connect("http://" & $mockServer.localAddress()[0])
proc stopServer() {.async.} =
await mockServer.stop()
setup:
client = newRpcHttpClient()
await startServer()
subscriptions = PollingSubscriptions(
JsonRpcSubscriptions.new(
client,
pollingInterval = 1.millis))
subscriptions.start()
teardown:
await subscriptions.close()
await client.close()
await mockServer.stop()
test "filter not found error recreates log filter":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) = discard
check subscriptions.logFilters.len == 0
check subscriptions.subscriptionMapping.len == 0
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
check subscriptions.logFilters[id] == filter
check subscriptions.subscriptionMapping[id] == id
check subscriptions.logFilters.len == 1
check subscriptions.subscriptionMapping.len == 1
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
test "recreated log filter can be still unsubscribed using the original id":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
await subscriptions.unsubscribe(id)
check not subscriptions.logFilters.hasKey id
check not subscriptions.subscriptionMapping.hasKey id
test "filter not found error recreates block filter":
let emptyHandler = proc(blck: ?!Block) = discard
check subscriptions.subscriptionMapping.len == 0
let id = await subscriptions.subscribeBlocks(emptyHandler)
check subscriptions.subscriptionMapping[id] == id
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
test "recreated block filter can be still unsubscribed using the original id":
let emptyHandler = proc(blck: ?!Block) = discard
let id = await subscriptions.subscribeBlocks(emptyHandler)
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
await subscriptions.unsubscribe(id)
check not subscriptions.subscriptionMapping.hasKey id
test "polling continues with new filter after temporary error":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
await stopServer()
mockServer.invalidateFilter(id)
await sleepAsync(50.milliseconds)
await startServer()
check eventually subscriptions.subscriptionMapping[id] != id
test "calls callback with failed result on error":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
var failedResultReceived = false
proc handler(log: ?!Log) =
if log.isErr:
failedResultReceived = true
discard await subscriptions.subscribeLogs(filter, handler)
await sleepAsync(50.milliseconds)
mockServer.nextGetChangesReturnsError = true
check eventually failedResultReceived

View File

@ -1,56 +0,0 @@
import std/os
import std/importutils
import pkg/asynctest/chronos/unittest
import pkg/json_rpc/rpcclient
import ethers/provider
import ethers/providers/jsonrpc/subscriptions
import ../../examples
suite "Websocket re-subscriptions":
privateAccess(JsonRpcSubscriptions)
var subscriptions: JsonRpcSubscriptions
var client: RpcWebSocketClient
var resubscribeInterval: int
setup:
resubscribeInterval = 3
client = newRpcWebSocketClient()
await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545"))
subscriptions = JsonRpcSubscriptions.new(client, resubscribeInterval = resubscribeInterval)
subscriptions.start()
teardown:
await subscriptions.close()
await client.close()
test "unsubscribing from a log filter while subscriptions are being resubscribed does not cause a concurrency error":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) = discard
for i in 1..10:
discard await subscriptions.subscribeLogs(filter, emptyHandler)
# Wait until the re-subscription starts
await sleepAsync(resubscribeInterval.seconds)
# Attempt to modify callbacks while its being iterated
discard await subscriptions.subscribeLogs(filter, emptyHandler)
test "resubscribe events take effect with new subscription IDs in the log filters":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
check id in subscriptions.logFilters
check subscriptions.logFilters.len == 1
# Make sure the subscription is done
await sleepAsync((resubscribeInterval + 1).seconds)
# The previous subscription should not be in the log filters
check id notin subscriptions.logFilters
# There is still one subscription which is the new one
check subscriptions.logFilters.len == 1

View File

@ -1,7 +1,6 @@
import ./jsonrpc/testJsonRpcProvider
import ./jsonrpc/testJsonRpcSigner
import ./jsonrpc/testJsonRpcSubscriptions
import ./jsonrpc/testWsResubscription
import ./jsonrpc/testConversions
import ./jsonrpc/testErrors

View File

@ -28,7 +28,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
var accounts: seq[Address]
setup:
provider = JsonRpcProvider.new(url, pollingInterval = 100.millis)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
accounts = await provider.listAccounts()
let deployment = readDeployment()

View File

@ -23,10 +23,10 @@ suite "Contract custom errors":
var contract: TestCustomErrors
var provider: JsonRpcProvider
var snapshot: JsonNode
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
setup:
provider = JsonRpcProvider.new("http://" & providerUrl)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
let deployment = readDeployment()
let address = !deployment.address(TestCustomErrors)

View File

@ -15,10 +15,10 @@ suite "Contract enum parameters and return values":
var contract: TestEnums
var provider: JsonRpcProvider
var snapshot: JsonNode
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
setup:
provider = JsonRpcProvider.new("http://" & providerUrl)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
let deployment = readDeployment()
contract = TestEnums.new(!deployment.address(TestEnums), provider)

View File

@ -24,7 +24,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
var accounts: seq[Address]
setup:
provider = JsonRpcProvider.new(url, pollingInterval = 100.millis)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
accounts = await provider.listAccounts()
let deployment = readDeployment()

View File

@ -15,10 +15,10 @@ suite "gas estimation":
var contract: TestGasEstimation
var provider: JsonRpcProvider
var snapshot: JsonNode
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
setup:
provider = JsonRpcProvider.new("http://" & providerUrl)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
let deployment = readDeployment()
let signer = provider.getSigner()

View File

@ -14,10 +14,10 @@ suite "Contract return values":
var contract: TestReturns
var provider: JsonRpcProvider
var snapshot: JsonNode
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
setup:
provider = JsonRpcProvider.new("http://" & providerUrl)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
let deployment = readDeployment()
contract = TestReturns.new(!deployment.address(TestReturns), provider)

View File

@ -93,10 +93,10 @@ suite "Testing helpers - contracts":
var snapshot: JsonNode
var accounts: seq[Address]
let revertReason = "revert reason"
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
let url = "ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
setup:
provider = JsonRpcProvider.new("ws://" & providerUrl)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
accounts = await provider.listAccounts()
helpersContract = TestHelpers.new(provider.getSigner())

View File

@ -13,10 +13,10 @@ proc transfer*(erc20: Erc20, recipient: Address, amount: UInt256) {.contract.}
suite "Wallet":
var provider: JsonRpcProvider
var snapshot: JsonNode
let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")
setup:
provider = JsonRpcProvider.new("http://" & providerUrl)
provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis)
snapshot = await provider.send("evm_snapshot")
teardown:
@ -31,13 +31,12 @@ suite "Wallet":
check isSuccess Wallet.new("0x" & pk1)
test "Can create Wallet with provider":
let provider = JsonRpcProvider.new()
check isSuccess Wallet.new(pk1, provider)
discard Wallet.new(PrivateKey.fromHex(pk1).get, provider)
test "Cannot create wallet with invalid key string":
check isFailure Wallet.new("0xInvalidKey")
check isFailure Wallet.new("0xInvalidKey", JsonRpcProvider.new())
check isFailure Wallet.new("0xInvalidKey", provider)
test "Can connect Wallet to provider":
let wallet = !Wallet.new(pk1)