Refactor error bubbling

Ensure that all errors bubble to the main `convertError` proc.

Add websockets mocks and tests.

Formatting updates via nph.
This commit is contained in:
Eric 2025-07-08 12:10:02 +10:00
parent 7034bceaaf
commit 933103ca6d
No known key found for this signature in database
9 changed files with 617 additions and 511 deletions

View File

@ -13,6 +13,8 @@ requires "serde >= 1.2.1 & < 1.3.0"
requires "stint >= 0.8.1 & < 0.9.0" requires "stint >= 0.8.1 & < 0.9.0"
requires "stew >= 0.2.0 & < 0.3.0" requires "stew >= 0.2.0 & < 0.3.0"
requires "eth >= 0.5.0 & < 0.6.0" requires "eth >= 0.5.0 & < 0.6.0"
requires "websock >= 0.2.0 & < 0.3.0"
requires "httputils >= 0.2.0"
task test, "Run the test suite": task test, "Run the test suite":
# exec "nimble install -d -y" # exec "nimble install -d -y"

View File

@ -1,3 +1,5 @@
import pkg/websock/websock
import pkg/json_rpc/errors
import ./basics import ./basics
type type
@ -7,20 +9,53 @@ type
SubscriptionError* = object of EthersError SubscriptionError* = object of EthersError
ProviderError* = object of EthersError ProviderError* = object of EthersError
data*: ?seq[byte] data*: ?seq[byte]
RpcNetworkError* = object of EthersError RpcNetworkError* = object of EthersError
RpcHttpErrorResponse* = object of RpcNetworkError RpcHttpErrorResponse* = object of RpcNetworkError
HttpRequestLimitError* = object of RpcHttpErrorResponse HttpRequestLimitError* = object of RpcHttpErrorResponse
HttpRequestTimeoutError* = object of RpcHttpErrorResponse HttpRequestTimeoutError* = object of RpcHttpErrorResponse
WebsocketConnectionError* = object of RpcNetworkError
{.push raises:[].} {.push raises: [].}
template convertErrorsTo*(newErrorType: type, body) =
try:
body
except CancelledError as error:
raise error
except RpcNetworkError as error:
raise error
except RpcPostError as error:
raiseNetworkError(error)
except FailedHttpResponse as error:
raiseNetworkError(error)
except HttpError as error: # from websock.common
# eg Timeout expired while receiving headers
# eg Unable to connect to host on any address!
# eg No connection to host!
raise newException(WebsocketConnectionError, error.msg, error)
except WSClosedError as error:
raise newException(WebsocketConnectionError, error.msg, error)
except ErrorResponse as error:
if error.status == 429:
raise newException(HttpRequestLimitError, error.msg, error)
elif error.status == 408:
raise newException(HttpRequestTimeoutError, error.msg, error)
else:
raise newException(newErrorType, error.msg, error)
except JsonRpcError as error:
var message = error.msg
if jsn =? JsonNode.fromJson(message):
if "message" in jsn:
message = jsn{"message"}.getStr
raise newException(newErrorType, message, error)
except CatchableError as error:
raise newException(newErrorType, error.msg, error)
proc toErr*[E1: ref CatchableError, E2: EthersError]( proc toErr*[E1: ref CatchableError, E2: EthersError](
e1: E1, e1: E1, _: type E2, msg: string = e1.msg
_: type E2, ): ref E2 =
msg: string = e1.msg): ref E2 =
return newException(E2, msg, e1) return newException(E2, msg, e1)
proc raiseNetworkError*( proc raiseNetworkError*(error: ref CatchableError) {.raises: [RpcNetworkError].} =
error: ref CatchableError) {.raises: [RpcNetworkError].} =
raise newException(RpcNetworkError, error.msg, error) raise newException(RpcNetworkError, error.msg, error)

View File

@ -16,7 +16,7 @@ import ./jsonrpc/errors
export basics export basics
export provider export provider
export chronicles export chronicles
export errors.JsonRpcProviderError export errors
export subscriptions export subscriptions
{.push raises: [].} {.push raises: [].}
@ -37,6 +37,7 @@ type
JsonRpcSigner* = ref object of Signer JsonRpcSigner* = ref object of Signer
provider: JsonRpcProvider provider: JsonRpcProvider
address: ?Address address: ?Address
JsonRpcSignerError* = object of SignerError JsonRpcSignerError* = object of SignerError
# Provider # Provider
@ -44,19 +45,19 @@ type
const defaultUrl = "http://localhost:8545" const defaultUrl = "http://localhost:8545"
const defaultPollingInterval = 4.seconds const defaultPollingInterval = 4.seconds
proc jsonHeaders: seq[(string, string)] = proc jsonHeaders(): seq[(string, string)] =
@[("Content-Type", "application/json")] @[("Content-Type", "application/json")]
proc new*( proc new*(
_: type JsonRpcProvider, _: type JsonRpcProvider, url = defaultUrl, pollingInterval = defaultPollingInterval
url=defaultUrl, ): JsonRpcProvider {.raises: [JsonRpcProviderError].} =
pollingInterval=defaultPollingInterval): JsonRpcProvider {.raises: [JsonRpcProviderError].} =
var initialized: Future[void] var initialized: Future[void]
var client: RpcClient var client: RpcClient
var subscriptions: JsonRpcSubscriptions var subscriptions: JsonRpcSubscriptions
proc initialize() {.async: (raises: [JsonRpcProviderError, CancelledError, RpcNetworkError]).} = proc initialize() {.
async: (raises: [JsonRpcProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
case parseUri(url).scheme case parseUri(url).scheme
of "ws", "wss": of "ws", "wss":
@ -68,8 +69,8 @@ proc new*(
let http = newRpcHttpClient() let http = newRpcHttpClient()
await http.connect(url) await http.connect(url)
client = http client = http
subscriptions = JsonRpcSubscriptions.new(http, subscriptions =
pollingInterval = pollingInterval) JsonRpcSubscriptions.new(http, pollingInterval = pollingInterval)
subscriptions.start() subscriptions.start()
proc awaitClient(): Future[RpcClient] {. proc awaitClient(): Future[RpcClient] {.
@ -91,30 +92,30 @@ proc new*(
proc callImpl( proc callImpl(
client: RpcClient, call: string, args: JsonNode client: RpcClient, call: string, args: JsonNode
): Future[JsonNode] {.async: (raises: [JsonRpcProviderError, CancelledError, JsonRpcError]).} = ): Future[JsonNode] {.
try: async: (raises: [JsonRpcProviderError, CancelledError, RpcNetworkError])
.} =
convertError:
let response = await client.call(call, %args) let response = await client.call(call, %args)
without json =? JsonNode.fromJson(response.string), error: without json =? JsonNode.fromJson(response.string), error:
raiseJsonRpcProviderError error, "Failed to parse response '" & response.string & "': " & raiseJsonRpcProviderError error,
error.msg "Failed to parse response '" & response.string & "': " & error.msg
return json return json
except CancelledError as error:
raise error
except JsonRpcError as error:
raise error
except CatchableError as error:
raiseJsonRpcProviderError error
proc send*( proc send*(
provider: JsonRpcProvider, call: string, arguments: seq[JsonNode] = @[] provider: JsonRpcProvider, call: string, arguments: seq[JsonNode] = @[]
): Future[JsonNode] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[JsonNode] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let client = await provider.client let client = await provider.client
return await client.callImpl(call, %arguments) return await client.callImpl(call, %arguments)
proc listAccounts*( proc listAccounts*(
provider: JsonRpcProvider provider: JsonRpcProvider
): Future[seq[Address]] {.async: (raises: [JsonRpcProviderError, CancelledError, RpcNetworkError]).} = ): Future[seq[Address]] {.
async: (raises: [JsonRpcProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let client = await provider.client let client = await provider.client
return await client.eth_accounts() return await client.eth_accounts()
@ -141,7 +142,9 @@ method getBlock*(
method call*( method call*(
provider: JsonRpcProvider, tx: Transaction, blockTag = BlockTag.latest provider: JsonRpcProvider, tx: Transaction, blockTag = BlockTag.latest
): Future[seq[byte]] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[seq[byte]] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let client = await provider.client let client = await provider.client
return await client.eth_call(tx, blockTag) return await client.eth_call(tx, blockTag)
@ -162,21 +165,27 @@ method getTransactionCount*(
method getTransaction*( method getTransaction*(
provider: JsonRpcProvider, txHash: TransactionHash provider: JsonRpcProvider, txHash: TransactionHash
): Future[?PastTransaction] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[?PastTransaction] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let client = await provider.client let client = await provider.client
return await client.eth_getTransactionByHash(txHash) return await client.eth_getTransactionByHash(txHash)
method getTransactionReceipt*( method getTransactionReceipt*(
provider: JsonRpcProvider, txHash: TransactionHash provider: JsonRpcProvider, txHash: TransactionHash
): Future[?TransactionReceipt] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[?TransactionReceipt] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let client = await provider.client let client = await provider.client
return await client.eth_getTransactionReceipt(txHash) return await client.eth_getTransactionReceipt(txHash)
method getLogs*( method getLogs*(
provider: JsonRpcProvider, filter: EventFilter provider: JsonRpcProvider, filter: EventFilter
): Future[seq[Log]] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[seq[Log]] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let client = await provider.client let client = await provider.client
let logsJson = let logsJson =
@ -195,9 +204,7 @@ method getLogs*(
return logs return logs
method estimateGas*( method estimateGas*(
provider: JsonRpcProvider, provider: JsonRpcProvider, transaction: Transaction, blockTag = BlockTag.latest
transaction: Transaction,
blockTag = BlockTag.latest,
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[UInt256] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} =
try: try:
convertError: convertError:
@ -225,7 +232,9 @@ method getChainId*(
method sendTransaction*( method sendTransaction*(
provider: JsonRpcProvider, rawTransaction: seq[byte] provider: JsonRpcProvider, rawTransaction: seq[byte]
): Future[TransactionResponse] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[TransactionResponse] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let let
client = await provider.client client = await provider.client
@ -235,7 +244,9 @@ method sendTransaction*(
method subscribe*( method subscribe*(
provider: JsonRpcProvider, filter: EventFilter, onLog: LogHandler provider: JsonRpcProvider, filter: EventFilter, onLog: LogHandler
): Future[Subscription] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[Subscription] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let subscriptions = await provider.subscriptions let subscriptions = await provider.subscriptions
let id = await subscriptions.subscribeLogs(filter, onLog) let id = await subscriptions.subscribeLogs(filter, onLog)
@ -243,7 +254,9 @@ method subscribe*(
method subscribe*( method subscribe*(
provider: JsonRpcProvider, onBlock: BlockHandler provider: JsonRpcProvider, onBlock: BlockHandler
): Future[Subscription] {.async: (raises: [ProviderError, CancelledError, RpcNetworkError]).} = ): Future[Subscription] {.
async: (raises: [ProviderError, CancelledError, RpcNetworkError])
.} =
convertError: convertError:
let subscriptions = await provider.subscriptions let subscriptions = await provider.subscriptions
let id = await subscriptions.subscribeBlocks(onBlock) let id = await subscriptions.subscribeBlocks(onBlock)
@ -277,21 +290,17 @@ method close*(
# Signer # Signer
template convertSignerError(body) = template convertSignerError(body) =
try: convertErrorsTo(JsonRpcSignerError):
body body
except CancelledError as error:
raise error
except CatchableError as error:
raise newException(JsonRpcSignerError, error.msg, error)
method provider*(signer: JsonRpcSigner): Provider
{.gcsafe, raises: [SignerError].} =
method provider*(signer: JsonRpcSigner): Provider {.gcsafe, raises: [SignerError].} =
signer.provider signer.provider
method getAddress*( method getAddress*(
signer: JsonRpcSigner signer: JsonRpcSigner
): Future[Address] {.async: (raises: [ProviderError, SignerError, CancelledError, RpcNetworkError]).} = ): Future[Address] {.
async: (raises: [ProviderError, SignerError, CancelledError, RpcNetworkError])
.} =
if address =? signer.address: if address =? signer.address:
return address return address

View File

@ -4,10 +4,11 @@ import ../../basics
import ../../errors import ../../errors
import ../../provider import ../../provider
import ./conversions import ./conversions
import pkg/websock/websock
export errors export errors
{.push raises:[].} {.push raises: [].}
type JsonRpcProviderError* = object of ProviderError type JsonRpcProviderError* = object of ProviderError
@ -31,53 +32,13 @@ func new*(_: type JsonRpcProviderError, json: JsonNode): ref JsonRpcProviderErro
error error
proc raiseJsonRpcProviderError*( proc raiseJsonRpcProviderError*(
error: ref CatchableError, message = error.msg) {.raises: [JsonRpcProviderError].} = error: ref CatchableError, message = error.msg
) {.raises: [JsonRpcProviderError].} =
if json =? JsonNode.fromJson(error.msg): if json =? JsonNode.fromJson(error.msg):
raise JsonRpcProviderError.new(json) raise JsonRpcProviderError.new(json)
else: else:
raise newException(JsonRpcProviderError, message) raise newException(JsonRpcProviderError, message)
proc underlyingErrorOf(e: ref Exception, T: type CatchableError): (ref CatchableError) =
if e of (ref T):
return (ref T)(e)
elif not e.parent.isNil:
return e.parent.underlyingErrorOf T
else:
return nil
template convertError*(body) = template convertError*(body) =
try: convertErrorsTo(JsonRpcProviderError):
try: body
body
# Inspect SubscriptionErrors and re-raise underlying JsonRpcErrors so that
# exception inspection and resolution only needs to happen once. All
# CatchableErrors that occur in the Subscription module are converted to
# SubscriptionError, with the original error preserved as the exception's
# parent.
except SubscriptionError, SignerError:
let e = getCurrentException()
let parent = e.underlyingErrorOf(JsonRpcError)
if not parent.isNil:
raise parent
except CancelledError as error:
raise error
except RpcPostError as error:
raiseNetworkError(error)
except FailedHttpResponse as error:
raiseNetworkError(error)
except ErrorResponse as error:
if error.status == 429:
raise newException(HttpRequestLimitError, error.msg, error)
elif error.status == 408:
raise newException(HttpRequestTimeoutError, error.msg, error)
else:
raiseJsonRpcProviderError(error)
except JsonRpcError as error:
var message = error.msg
if jsn =? JsonNode.fromJson(message):
if "message" in jsn:
message = jsn{"message"}.getStr
raiseJsonRpcProviderError(error, message)
except CatchableError as error:
raiseJsonRpcProviderError(error)

View File

@ -25,63 +25,60 @@ type
# WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5 # WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5
# minutes. # minutes.
logFilters: Table[JsonNode, EventFilter] logFilters: Table[JsonNode, EventFilter]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].}
{.push raises:[].} MethodHandler* = proc(j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback =
proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises: [].}
{.push raises: [].}
template convertErrorsToSubscriptionError(body) = template convertErrorsToSubscriptionError(body) =
try: convertErrorsTo(SubscriptionError):
body body
except CancelledError as error:
raise error
except CatchableError as error:
raise error.toErr(SubscriptionError)
template `or`(a: JsonNode, b: typed): JsonNode = template `or`(a: JsonNode, b: typed): JsonNode =
if a.isNil: b else: a if a.isNil: b else: a
func start*(subscriptions: JsonRpcSubscriptions) = func start*(subscriptions: JsonRpcSubscriptions) =
subscriptions.client.onProcessMessage = subscriptions.client.onProcessMessage = proc(
proc(client: RpcClient, client: RpcClient, line: string
line: string): Result[bool, string] {.gcsafe, raises: [].} = ): Result[bool, string] {.gcsafe, raises: [].} =
if json =? JsonNode.fromJson(line): if json =? JsonNode.fromJson(line):
if "method" in json: if "method" in json:
let methodName = json{"method"}.getStr() let methodName = json{"method"}.getStr()
if methodName in subscriptions.methodHandlers: if methodName in subscriptions.methodHandlers:
let handler = subscriptions.methodHandlers.getOrDefault(methodName) let handler = subscriptions.methodHandlers.getOrDefault(methodName)
if not handler.isNil: if not handler.isNil:
handler(json{"params"} or newJArray()) handler(json{"params"} or newJArray())
# false = do not continue processing message using json_rpc's # false = do not continue processing message using json_rpc's
# default processing handler # default processing handler
return ok false return ok false
# true = continue processing message using json_rpc's default message handler # true = continue processing message using json_rpc's default message handler
return ok true return ok true
proc setMethodHandler( proc setMethodHandler(
subscriptions: JsonRpcSubscriptions, subscriptions: JsonRpcSubscriptions, `method`: string, handler: MethodHandler
`method`: string,
handler: MethodHandler
) = ) =
subscriptions.methodHandlers[`method`] = handler subscriptions.methodHandlers[`method`] = handler
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, method subscribeBlocks*(
onBlock: BlockHandler): subscriptions: JsonRpcSubscriptions, onBlock: BlockHandler
Future[JsonNode] ): Future[JsonNode] {.
{.async: (raises: [SubscriptionError, CancelledError]), base,.} = async: (raises: [SubscriptionError, CancelledError, RpcNetworkError]), base
.} =
raiseAssert "not implemented" raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions, method subscribeLogs*(
filter: EventFilter, subscriptions: JsonRpcSubscriptions, filter: EventFilter, onLog: LogHandler
onLog: LogHandler): ): Future[JsonNode] {.
Future[JsonNode] async: (raises: [SubscriptionError, CancelledError, RpcNetworkError]), base
{.async: (raises: [SubscriptionError, CancelledError]), base.} = .} =
raiseAssert "not implemented" raiseAssert "not implemented"
method unsubscribe*(subscriptions: JsonRpcSubscriptions, method unsubscribe*(
id: JsonNode) subscriptions: JsonRpcSubscriptions, id: JsonNode
{.async: (raises: [CancelledError]), base.} = ) {.async: (raises: [CancelledError]), base.} =
raiseAssert "not implemented " raiseAssert "not implemented "
method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: []), base.} = method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: []), base.} =
@ -92,23 +89,24 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: []), base.}
except CatchableError as e: except CatchableError as e:
error "JsonRpc unsubscription failed", error = e.msg, id = id error "JsonRpc unsubscription failed", error = e.msg, id = id
proc getCallback(subscriptions: JsonRpcSubscriptions, proc getCallback(
id: JsonNode): ?SubscriptionCallback {. raises:[].} = subscriptions: JsonRpcSubscriptions, id: JsonNode
): ?SubscriptionCallback {.raises: [].} =
try: try:
if not id.isNil and id in subscriptions.callbacks: if not id.isNil and id in subscriptions.callbacks:
return subscriptions.callbacks[id].some return subscriptions.callbacks[id].some
except: discard except:
discard
# Web sockets # Web sockets
# Default re-subscription period is seconds # Default re-subscription period is seconds
const WsResubscribe {.intdefine.}: int = 0 const WsResubscribe {.intdefine.}: int = 0
type type WebSocketSubscriptions = ref object of JsonRpcSubscriptions
WebSocketSubscriptions = ref object of JsonRpcSubscriptions logFiltersLock: AsyncLock
logFiltersLock: AsyncLock resubscribeFut: Future[void]
resubscribeFut: Future[void] resubscribeInterval: int
resubscribeInterval: int
template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) = template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) =
if subscriptions.logFiltersLock.isNil: if subscriptions.logFiltersLock.isNil:
@ -122,13 +120,14 @@ template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) =
# This is a workaround to manage the 5 minutes limit due to hardhat. # This is a workaround to manage the 5 minutes limit due to hardhat.
# See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064 # See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064
proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebSocketSubscriptions) {.async: (raises: [CancelledError]).} = proc resubscribeWebsocketEventsOnTimeout*(
subscriptions: WebSocketSubscriptions
) {.async: (raises: [CancelledError]).} =
while true: while true:
await sleepAsync(subscriptions.resubscribeInterval.seconds) await sleepAsync(subscriptions.resubscribeInterval.seconds)
try: try:
withLock(subscriptions): withLock(subscriptions):
for id, callback in subscriptions.callbacks: for id, callback in subscriptions.callbacks:
var newId: JsonNode var newId: JsonNode
if id in subscriptions.logFilters: if id in subscriptions.logFilters:
let filter = subscriptions.logFilters[id] let filter = subscriptions.logFilters[id]
@ -144,31 +143,37 @@ proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebSocketSubscriptions)
except CancelledError as e: except CancelledError as e:
raise e raise e
except CatchableError as e: except CatchableError as e:
error "WS resubscription failed" , error = e.msg error "WS resubscription failed", error = e.msg
proc new*(_: type JsonRpcSubscriptions, proc new*(
client: RpcWebSocketClient, _: type JsonRpcSubscriptions,
resubscribeInterval = WsResubscribe): JsonRpcSubscriptions = client: RpcWebSocketClient,
let subscriptions = WebSocketSubscriptions(client: client, resubscribeInterval: resubscribeInterval) resubscribeInterval = WsResubscribe,
): JsonRpcSubscriptions =
let subscriptions =
WebSocketSubscriptions(client: client, resubscribeInterval: resubscribeInterval)
proc subscriptionHandler(arguments: JsonNode) {.raises:[].} = proc subscriptionHandler(arguments: JsonNode) {.raises: [].} =
let id = arguments{"subscription"} or newJString("") let id = arguments{"subscription"} or newJString("")
if callback =? subscriptions.getCallback(id): if callback =? subscriptions.getCallback(id):
callback(id, success(arguments)) callback(id, success(arguments))
subscriptions.setMethodHandler("eth_subscription", subscriptionHandler) subscriptions.setMethodHandler("eth_subscription", subscriptionHandler)
if resubscribeInterval > 0: if resubscribeInterval > 0:
if resubscribeInterval >= 300: if resubscribeInterval >= 300:
warn "Resubscription interval greater than 300 seconds is useless for hardhat workaround", resubscribeInterval = resubscribeInterval warn "Resubscription interval greater than 300 seconds is useless for hardhat workaround",
resubscribeInterval = resubscribeInterval
subscriptions.resubscribeFut = resubscribeWebsocketEventsOnTimeout(subscriptions) subscriptions.resubscribeFut = resubscribeWebsocketEventsOnTimeout(subscriptions)
subscriptions subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions, method subscribeBlocks*(
onBlock: BlockHandler): subscriptions: WebSocketSubscriptions, onBlock: BlockHandler
Future[JsonNode] ): Future[JsonNode] {.
{.async: (raises: [SubscriptionError, CancelledError]).} = async: (raises: [SubscriptionError, CancelledError, RpcNetworkError])
.} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} = proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} =
without arguments =? argumentsResult, error: without arguments =? argumentsResult, error:
onBlock(failure(Block, error.toErr(SubscriptionError))) onBlock(failure(Block, error.toErr(SubscriptionError)))
@ -183,11 +188,11 @@ method subscribeBlocks(subscriptions: WebSocketSubscriptions,
subscriptions.callbacks[id] = callback subscriptions.callbacks[id] = callback
return id return id
method subscribeLogs(subscriptions: WebSocketSubscriptions, method subscribeLogs*(
filter: EventFilter, subscriptions: WebSocketSubscriptions, filter: EventFilter, onLog: LogHandler
onLog: LogHandler): ): Future[JsonNode] {.
Future[JsonNode] async: (raises: [SubscriptionError, CancelledError, RpcNetworkError])
{.async: (raises: [SubscriptionError, CancelledError]).} = .} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) = proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error: without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError))) onLog(failure(Log, error.toErr(SubscriptionError)))
@ -203,9 +208,9 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions,
subscriptions.logFilters[id] = filter subscriptions.logFilters[id] = filter
return id return id
method unsubscribe*(subscriptions: WebSocketSubscriptions, method unsubscribe*(
id: JsonNode) subscriptions: WebSocketSubscriptions, id: JsonNode
{.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError]).} =
try: try:
withLock(subscriptions): withLock(subscriptions):
subscriptions.callbacks.del(id) subscriptions.callbacks.del(id)
@ -219,22 +224,20 @@ method unsubscribe*(subscriptions: WebSocketSubscriptions,
method close*(subscriptions: WebSocketSubscriptions) {.async: (raises: []).} = method close*(subscriptions: WebSocketSubscriptions) {.async: (raises: []).} =
await procCall JsonRpcSubscriptions(subscriptions).close() await procCall JsonRpcSubscriptions(subscriptions).close()
if not subscriptions.resubscribeFut.isNil: if not subscriptions.resubscribeFut.isNil:
await subscriptions.resubscribeFut.cancelAndWait() await subscriptions.resubscribeFut.cancelAndWait()
# Polling # Polling
type type PollingSubscriptions* = ref object of JsonRpcSubscriptions
PollingSubscriptions* = ref object of JsonRpcSubscriptions polling: Future[void]
polling: Future[void]
# Used when filters are recreated to translate from the id that user # Used when filters are recreated to translate from the id that user
# originally got returned to new filter id # originally got returned to new filter id
subscriptionMapping: Table[JsonNode, JsonNode] subscriptionMapping: Table[JsonNode, JsonNode]
proc new*(_: type JsonRpcSubscriptions,
client: RpcHttpClient,
pollingInterval = 4.seconds): JsonRpcSubscriptions =
proc new*(
_: type JsonRpcSubscriptions, client: RpcHttpClient, pollingInterval = 4.seconds
): JsonRpcSubscriptions =
let subscriptions = PollingSubscriptions(client: client) let subscriptions = PollingSubscriptions(client: client)
proc resubscribe(id: JsonNode): Future[?!void] {.async: (raises: [CancelledError]).} = proc resubscribe(id: JsonNode): Future[?!void] {.async: (raises: [CancelledError]).} =
@ -252,12 +255,21 @@ proc new*(_: type JsonRpcSubscriptions,
except CancelledError as e: except CancelledError as e:
raise e raise e
except CatchableError as e: except CatchableError as e:
return failure(void, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) return failure(
void,
e.toErr(
SubscriptionError,
"HTTP polling: There was an exception while getting subscription changes: " &
e.msg,
),
)
return success() return success()
proc getChanges(id: JsonNode): Future[?!JsonNode] {.async: (raises: [CancelledError]).} = proc getChanges(
if mappedId =? subscriptions.subscriptionMapping.?[id]: id: JsonNode
): Future[?!JsonNode] {.async: (raises: [CancelledError]).} =
if mappedId =? subscriptions.subscriptionMapping .? [id]:
try: try:
let changes = await subscriptions.client.eth_getFilterChanges(mappedId) let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
if changes.kind == JArray: if changes.kind == JArray:
@ -273,13 +285,27 @@ proc new*(_: type JsonRpcSubscriptions,
# https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977 # https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977
if not ("filter not found" in e.msg): if not ("filter not found" in e.msg):
return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) return failure(
JsonNode,
e.toErr(
SubscriptionError,
"HTTP polling: There was an exception while getting subscription changes: " &
e.msg,
),
)
except CancelledError as e: except CancelledError as e:
raise e raise e
except SubscriptionError as e: except SubscriptionError as e:
return failure(JsonNode, e) return failure(JsonNode, e)
except CatchableError as e: except CatchableError as e:
return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) return failure(
JsonNode,
e.toErr(
SubscriptionError,
"HTTP polling: There was an exception while getting subscription changes: " &
e.msg,
),
)
return success(newJArray()) return success(newJArray())
proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} = proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} =
@ -293,7 +319,7 @@ proc new*(_: type JsonRpcSubscriptions,
for change in changes: for change in changes:
callback(id, success(change)) callback(id, success(change))
proc poll {.async: (raises: []).} = proc poll() {.async: (raises: []).} =
try: try:
while true: while true:
for id in toSeq subscriptions.callbacks.keys: for id in toSeq subscriptions.callbacks.keys:
@ -310,22 +336,26 @@ method close*(subscriptions: PollingSubscriptions) {.async: (raises: []).} =
await subscriptions.polling.cancelAndWait() await subscriptions.polling.cancelAndWait()
await procCall JsonRpcSubscriptions(subscriptions).close() await procCall JsonRpcSubscriptions(subscriptions).close()
method subscribeBlocks(subscriptions: PollingSubscriptions, method subscribeBlocks*(
onBlock: BlockHandler): subscriptions: PollingSubscriptions, onBlock: BlockHandler
Future[JsonNode] ): Future[JsonNode] {.
{.async: (raises: [SubscriptionError, CancelledError]).} = async: (raises: [SubscriptionError, CancelledError, RpcNetworkError])
.} =
proc getBlock(hash: BlockHash) {.async: (raises:[]).} = proc getBlock(hash: BlockHash) {.async: (raises: []).} =
try: try:
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)): if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
onBlock(success(blck)) onBlock(success(blck))
except CancelledError: except CancelledError:
discard discard
except CatchableError as e: except CatchableError as e:
let error = e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg) let error = e.toErr(
SubscriptionError,
"HTTP polling: There was an exception while getting subscription's block: " &
e.msg,
)
onBlock(failure(Block, error)) onBlock(failure(Block, error))
proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises:[].} = proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises: [].} =
without change =? changeResult, e: without change =? changeResult, e:
onBlock(failure(Block, e.toErr(SubscriptionError))) onBlock(failure(Block, e.toErr(SubscriptionError)))
return return
@ -339,12 +369,11 @@ method subscribeBlocks(subscriptions: PollingSubscriptions,
subscriptions.subscriptionMapping[id] = id subscriptions.subscriptionMapping[id] = id
return id return id
method subscribeLogs(subscriptions: PollingSubscriptions, method subscribeLogs*(
filter: EventFilter, subscriptions: PollingSubscriptions, filter: EventFilter, onLog: LogHandler
onLog: LogHandler): ): Future[JsonNode] {.
Future[JsonNode] async: (raises: [SubscriptionError, CancelledError, RpcNetworkError])
{.async: (raises: [SubscriptionError, CancelledError]).} = .} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) = proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error: without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError))) onLog(failure(Log, error.toErr(SubscriptionError)))
@ -360,13 +389,13 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
subscriptions.subscriptionMapping[id] = id subscriptions.subscriptionMapping[id] = id
return id return id
method unsubscribe*(subscriptions: PollingSubscriptions, method unsubscribe*(
id: JsonNode) subscriptions: PollingSubscriptions, id: JsonNode
{.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError]).} =
try: try:
subscriptions.logFilters.del(id) subscriptions.logFilters.del(id)
subscriptions.callbacks.del(id) subscriptions.callbacks.del(id)
if sub =? subscriptions.subscriptionMapping.?[id]: if sub =? subscriptions.subscriptionMapping .? [id]:
subscriptions.subscriptionMapping.del(id) subscriptions.subscriptionMapping.del(id)
discard await subscriptions.client.eth_uninstallFilter(sub) discard await subscriptions.client.eth_uninstallFilter(sub)
except CancelledError as e: except CancelledError as e:

View File

@ -10,7 +10,7 @@ export httpserver
{.push raises: [].} {.push raises: [].}
type type
RpcResponse* = proc(request: HttpRequestRef): Future[HttpResponseRef] {.async: (raises: [CancelledError]), raises: [].} RpcResponse* = proc(request: HttpRequestRef): Future[HttpResponseRef] {.async: (raises: [CancelledError]).}
MockHttpServer* = object MockHttpServer* = object
server: HttpServerRef server: HttpServerRef
@ -32,12 +32,9 @@ proc init*(_: type MockHttpServer, address: TransportAddress): MockHttpServer =
let request = r.get() let request = r.get()
try: try:
let body = string.fromBytes(await request.getBody()) let body = string.fromBytes(await request.getBody())
echo "mockHttpServer.processRequest request: ", body
without req =? RequestRx.fromJson(body), error: without req =? RequestRx.fromJson(body), error:
echo "failed to deserialize, error: ", error.msg
return await request.respond(Http400, "Invalid request, must be valid json rpc request") return await request.respond(Http400, "Invalid request, must be valid json rpc request")
echo "Received request with method: ", req.method
if not server.rpcResponses.contains(req.method): if not server.rpcResponses.contains(req.method):
return await request.respond(Http404, "Method not registered") return await request.respond(Http404, "Method not registered")
@ -48,10 +45,8 @@ proc init*(_: type MockHttpServer, address: TransportAddress): MockHttpServer =
return await request.respond(Http500, "Method lookup error with key, error: " & e.msg) return await request.respond(Http500, "Method lookup error with key, error: " & e.msg)
except HttpProtocolError as e: except HttpProtocolError as e:
echo "HttpProtocolError encountered, error: ", e.msg
return defaultResponse(e) return defaultResponse(e)
except HttpTransportError as e: except HttpTransportError as e:
echo "HttpTransportError encountered, error: ", e.msg
return defaultResponse(e) return defaultResponse(e)
except HttpWriteError as exc: except HttpWriteError as exc:
return defaultResponse(exc) return defaultResponse(exc)

View File

@ -0,0 +1,128 @@
import std/tables
import std/sequtils
import pkg/chronos
import pkg/chronicles except toJson, `%`, `%*`
import pkg/websock/websock except toJson, `%`, `%*`
import pkg/json_rpc/clients/websocketclient except toJson, `%`, `%*`
import pkg/json_rpc/client except toJson, `%`, `%*`
import pkg/json_rpc/server except toJson, `%`, `%*`
import pkg/questionable
import pkg/websock/http/common
import pkg/serde
import pkg/stew/byteutils
type
MockWebSocketServer* = ref object
httpServer: HttpServer
address*: TransportAddress
connections: seq[WSSession]
rpcResponses: Table[string, WebSocketResponse]
running: bool
WebSocketResponse* = proc(ws: WSSession) {.async.}
RequestRx {.deserialize.} = object
jsonrpc*: string
id*: int
`method`*: string
ResponseKind* = enum
rkResult
rkError
ResponseError* {.serialize.} = object
code*: int
message*: string
data*: ?string
ResponseTx* {.serialize.} = object
jsonrpc*: string
id*: int
case kind* {.serialize(ignore = true).}: ResponseKind
of rkResult:
result*: JsonNode
of rkError:
error*: ResponseError
proc init*(T: type MockWebSocketServer, address: TransportAddress): T =
T(
address: address,
connections: @[],
rpcResponses: initTable[string, WebSocketResponse](),
running: false,
)
proc registerRpcResponse*(
server: MockWebSocketServer, `method`: string, response: WebSocketResponse
) =
server.rpcResponses[`method`] = response
proc handleWebSocketConnection(server: MockWebSocketServer, ws: WSSession) {.async.} =
server.connections.add(ws)
try:
while ws.readyState == ReadyState.Open:
let data = await ws.recvMsg()
let message = string.fromBytes(data)
without request =? RequestRx.fromJson(message), error:
await ws.close(StatusProtocolError, "Invalid JSON")
break
if request.method notin server.rpcResponses:
let response = ResponseTx(
jsonrpc: "2.0",
id: request.id,
kind: rkError,
error: ResponseError(code: 404, message: "Method not registered"),
)
await ws.send(response.toJson())
break
let rpcResponseProc = server.rpcResponses[request.method]
await ws.rpcResponseProc()
except WSClosedError:
# Connection was closed
trace "WebSocket connection closed"
except CatchableError as exc:
trace "WebSocket connection error", error = exc.msg
proc processRequest(server: MockWebSocketServer, request: HttpRequest) {.async.} =
let wsServer = WSServer.new(protos = ["proto"])
# perform upgrade
let ws = await wsServer.handleRequest(request)
await server.handleWebSocketConnection(ws)
proc start*(server: MockWebSocketServer) {.async.} =
if server.running:
return
let handler = proc(request: HttpRequest): Future[void] {.async, raises: [].} =
await server.processRequest(request)
server.httpServer =
HttpServer.create(address = server.address, handler = handler, flags = {ReuseAddr})
server.httpServer.start()
server.running = true
proc stop*(server: MockWebSocketServer) {.async.} =
if not server.running:
return
server.running = false
# Close all active connections
for conn in server.connections:
if conn.readyState == ReadyState.Open:
await conn.close(StatusGoingAway, "Server shutting down")
server.connections.setLen(0)
if not server.httpServer.isNil:
server.httpServer.stop()
proc localAddress*(server: MockWebSocketServer): TransportAddress =
if server.httpServer.isNil:
return server.address
return server.httpServer.localAddress()

View File

@ -1,38 +0,0 @@
import std/tables
import std/strutils
import std/uri
# pkg/chronos,
import pkg/chronicles
# pkg/chronos/apps/http/httpserver,
import pkg/websock/websock
import pkg/websock/tests/helpers
import pkg/httputils
import pkg/asynctest/chronos/unittest
# json_rpc/clients/websocketclient,
# json_rpc/[client, server],
# json_serialization
import pkg/stew/byteutils
import pkg/ethers
const address = initTAddress("127.0.0.1:8888")
proc handle(request: HttpRequest) {.async.} =
check request.uri.path == WSPath
let server = WSServer.new(protos = ["proto"])
let ws = await server.handleRequest(request)
let servRes = await ws.recvMsg()
check string.fromBytes(servRes) == testString
await ws.waitForClose()
proc run() {.async.} =
let server = createServer(
address = address,
handler = handle,
flags = {ReuseAddr})
let provider = JsonRpcProvider.new("ws://" & $address)

View File

@ -3,16 +3,18 @@ import std/sequtils
import std/typetraits import std/typetraits
import std/net import std/net
import stew/byteutils import pkg/stew/byteutils
import pkg/asynctest/chronos/unittest import pkg/asynctest/chronos/unittest
import pkg/chronos/apps/http/httpclient import pkg/chronos/apps/http/httpclient
import pkg/serde import pkg/serde
import pkg/questionable import pkg/questionable
import pkg/ethers/providers/jsonrpc import pkg/ethers/providers/jsonrpc except toJson, `%`, `%*`
import pkg/ethers/providers/jsonrpc/errors import pkg/ethers/providers/jsonrpc/errors except toJson, `%`, `%*`
import pkg/ethers/erc20 import pkg/ethers/erc20 except toJson, `%`, `%*`
import pkg/json_rpc/clients/httpclient import pkg/json_rpc/clients/httpclient except toJson, `%`, `%*`
import pkg/json_rpc/clients/websocketclient except toJson, `%`, `%*`
import pkg/websock/websock import pkg/websock/websock
import pkg/websock/http/common
import ./mocks/mockHttpServer import ./mocks/mockHttpServer
import ./mocks/mockWebSocketServer import ./mocks/mockWebSocketServer
import ../../examples import ../../examples
@ -37,7 +39,14 @@ suite "JSON RPC errors":
} }
check JsonRpcProviderError.new(error).data == some @[0xab'u8, 0xcd'u8] check JsonRpcProviderError.new(error).data == some @[0xab'u8, 0xcd'u8]
type TestToken = ref object of Erc20Token type
TestToken = ref object of Erc20Token
Before = proc(): Future[void] {.gcsafe, raises: [].}
# A proc that runs before each test
proc runBefore(before: Before) {.async.} =
if before != nil:
await before()
method mint( method mint(
token: TestToken, holder: Address, amount: UInt256 token: TestToken, holder: Address, amount: UInt256
@ -47,6 +56,7 @@ suite "Network errors - HTTP":
var provider: JsonRpcProvider var provider: JsonRpcProvider
var mockServer: MockHttpServer var mockServer: MockHttpServer
var token: TestToken var token: TestToken
var blockingSocket: Socket
setup: setup:
mockServer = MockHttpServer.init(initTAddress("127.0.0.1:0")) mockServer = MockHttpServer.init(initTAddress("127.0.0.1:0"))
@ -59,6 +69,9 @@ suite "Network errors - HTTP":
teardown: teardown:
await provider.close() await provider.close()
await mockServer.stop() await mockServer.stop()
if not blockingSocket.isNil:
blockingSocket.close()
blockingSocket = nil
proc registerRpcMethods(response: RpcResponse) = proc registerRpcMethods(response: RpcResponse) =
mockServer.registerRpcResponse("eth_accounts", response) mockServer.registerRpcResponse("eth_accounts", response)
@ -67,43 +80,38 @@ suite "Network errors - HTTP":
mockServer.registerRpcResponse("eth_sendRawTransaction", response) mockServer.registerRpcResponse("eth_sendRawTransaction", response)
mockServer.registerRpcResponse("eth_newBlockFilter", response) mockServer.registerRpcResponse("eth_newBlockFilter", response)
mockServer.registerRpcResponse("eth_newFilter", response) mockServer.registerRpcResponse("eth_newFilter", response)
# mockServer.registerRpcResponse("eth_subscribe", response) # TODO: handle
# eth_subscribe for websockets
proc testCustomResponse( proc testCustomResponse(
errorName: string, testNamePrefix: string,
responseHttpCode: HttpCode, response: RpcResponse,
responseText: string,
errorType: type CatchableError, errorType: type CatchableError,
before: Before = nil,
) = ) =
let response = proc( let prefix = testNamePrefix & " when "
request: HttpRequestRef
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
try:
return await request.respond(responseHttpCode, responseText)
except HttpWriteError as exc:
return defaultResponse(exc)
let testNamePrefix = test prefix & "sending a manual RPC method request":
errorName & " error response is converted to " & errorType.name & " for "
test testNamePrefix & "sending a manual RPC method request":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
discard await provider.send("eth_accounts") discard await provider.send("eth_accounts")
test testNamePrefix & test prefix &
"calling a provider method that converts errors when calling a generated RPC request": "calling a provider method that converts errors when calling a generated RPC request":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
discard await provider.listAccounts() discard await provider.listAccounts()
test testNamePrefix & "calling a view method of a contract": test prefix & "calling a view method of a contract":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
token = TestToken.new(token.address, provider.getSigner())
discard await token.balanceOf(Address.example) discard await token.balanceOf(Address.example)
test testNamePrefix & "calling a contract method that executes a transaction": test prefix & "calling a contract method that executes a transaction":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
token = TestToken.new(token.address, provider.getSigner()) token = TestToken.new(token.address, provider.getSigner())
discard await token.mint( discard await token.mint(
@ -112,14 +120,16 @@ suite "Network errors - HTTP":
TransactionOverrides(gasLimit: 100.u256.some, chainId: 1.u256.some), TransactionOverrides(gasLimit: 100.u256.some, chainId: 1.u256.some),
) )
test testNamePrefix & "sending a manual transaction": test prefix & "sending a manual transaction":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
let tx = Transaction.example let tx = Transaction.example
discard await provider.getSigner().sendTransaction(tx) discard await provider.getSigner().sendTransaction(tx)
test testNamePrefix & "sending a raw transaction": test prefix & "sending a raw transaction":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
const pk_with_funds = const pk_with_funds =
"0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
@ -134,15 +144,17 @@ suite "Network errors - HTTP":
let signedTx = await wallet.signTransaction(tx) let signedTx = await wallet.signTransaction(tx)
discard await provider.sendTransaction(signedTx) discard await provider.sendTransaction(signedTx)
test testNamePrefix & "subscribing to blocks": test prefix & "subscribing to blocks":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
let emptyHandler = proc(blckResult: ?!Block) = let emptyHandler = proc(blckResult: ?!Block) =
discard discard
discard await provider.subscribe(emptyHandler) discard await provider.subscribe(emptyHandler)
test testNamePrefix & "subscribing to logs": test prefix & "subscribing to logs":
registerRpcMethods(response) registerRpcMethods(response)
await runBefore(before)
expect errorType: expect errorType:
let filter = let filter =
EventFilter(address: Address.example, topics: @[array[32, byte].example]) EventFilter(address: Address.example, topics: @[array[32, byte].example])
@ -150,277 +162,250 @@ suite "Network errors - HTTP":
discard discard
discard await provider.subscribe(filter, emptyHandler) discard await provider.subscribe(filter, emptyHandler)
testCustomResponse("429", Http429, "Too many requests", HttpRequestLimitError) proc testCustomHttpResponse(
testCustomResponse("408", Http408, "Request timed out", HttpRequestTimeoutError) errorName: string,
testCustomResponse("non-429", Http500, "Server error", JsonRpcProviderError) responseHttpCode: HttpCode,
responseText: string,
errorType: type CatchableError,
) =
let response = proc(
request: HttpRequestRef
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
try:
return await request.respond(responseHttpCode, responseText)
except HttpWriteError as exc:
return defaultResponse(exc)
test "raises RpcNetworkError when reading response headers times out": let prefix = errorName & " error response is converted to " & errorType.name
privateAccess(JsonRpcProvider)
privateAccess(RpcHttpClient)
let responseTimeout = proc( testCustomResponse(prefix, response, errorType)
testCustomHttpResponse("429", Http429, "Too many requests", HttpRequestLimitError)
testCustomHttpResponse("408", Http408, "Request timed out", HttpRequestTimeoutError)
testCustomHttpResponse("non-429", Http500, "Server error", JsonRpcProviderError)
testCustomResponse(
"raises RpcNetworkError after a timeout waiting for reading response headers",
response = proc(
request: HttpRequestRef request: HttpRequestRef
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} = ): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
try: try:
await sleepAsync(5.minutes) await sleepAsync(5.minutes)
return await request.respond(Http200, "OK") return await request.respond(Http200, "OK")
except HttpWriteError as exc: except HttpWriteError as exc:
return defaultResponse(exc) return defaultResponse(exc),
RpcNetworkError,
before = proc(): Future[void] {.async.} =
privateAccess(JsonRpcProvider)
privateAccess(RpcHttpClient)
let rpcClient = await provider.client
let client: RpcHttpClient = (RpcHttpClient)(rpcClient)
client.httpSession = HttpSessionRef.new(headersTimeout = 1.millis),
)
let rpcClient = await provider.client testCustomResponse(
let client: RpcHttpClient = (RpcHttpClient)(rpcClient) "raises RpcNetworkError for a closed connection",
client.httpSession = HttpSessionRef.new(headersTimeout = 1.millis) response = proc(
mockServer.registerRpcResponse("eth_accounts", responseTimeout) request: HttpRequestRef
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
# Simulate a closed connection
return HttpResponseRef.new(),
RpcNetworkError,
before = proc(): Future[void] {.async.} =
await mockServer.stop()
,
)
expect RpcNetworkError: testCustomResponse(
discard await provider.send("eth_accounts") "raises RpcNetworkError for a timed out connection",
response = proc(
test "raises RpcNetworkError when connection is closed": request: HttpRequestRef
await mockServer.stop() ): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
expect RpcNetworkError: # Simulate a closed connection
discard await provider.send("eth_accounts") return HttpResponseRef.new(),
RpcNetworkError,
test "raises RpcNetworkError when connection times out":
privateAccess(JsonRpcProvider)
privateAccess(RpcHttpClient)
let rpcClient = await provider.client
let client: RpcHttpClient = (RpcHttpClient)(rpcClient)
client.httpSession.connectTimeout = 10.millis
let blockingSocket = newSocket()
blockingSocket.setSockOpt(OptReuseAddr, true)
blockingSocket.bindAddr(Port(9999))
await client.connect("http://localhost:9999")
expect RpcNetworkError:
# msg: Failed to send POST Request with JSON-RPC: Connection timed out # msg: Failed to send POST Request with JSON-RPC: Connection timed out
discard await provider.send("eth_accounts") before = proc(): Future[void] {.async.} =
privateAccess(JsonRpcProvider)
privateAccess(RpcHttpClient)
let rpcClient = await provider.client
let client: RpcHttpClient = (RpcHttpClient)(rpcClient)
client.httpSession.connectTimeout = 10.millis
# We don't need to recreate each and every possible exception condition, as blockingSocket = newSocket()
# they are all wrapped up in RpcPostError and converted to RpcNetworkError. blockingSocket.setSockOpt(OptReuseAddr, true)
# The tests above cover this conversion. blockingSocket.bindAddr(Port(9999))
# suite "Network errors - WebSocket": await client.connect("http://localhost:9999")
,
)
# var provider: JsonRpcProvider suite "Network errors - WebSocket":
# var mockWsServer: MockWebSocketServer var provider: JsonRpcProvider
# var token: TestToken var token: TestToken
var mockWsServer: MockWebSocketServer
# setup: setup:
# mockWsServer = MockWebSocketServer.init(initTAddress("127.0.0.1:0")) mockWsServer = MockWebSocketServer.init(initTAddress("127.0.0.1:0"))
# await mockWsServer.start() await mockWsServer.start()
# # Get the actual bound address # Get the actual bound address
# let actualAddress = mockWsServer.localAddress() provider = JsonRpcProvider.new("ws://" & $mockWsServer.localAddress)
# provider = JsonRpcProvider.new("ws://" & $actualAddress & "/ws")
# let deployment = readDeployment() let deployment = readDeployment()
# token = TestToken.new(!deployment.address(TestToken), provider) token = TestToken.new(!deployment.address(TestToken), provider)
# teardown: teardown:
# await provider.close() await mockWsServer.stop()
# await mockWsServer.stop() try:
await provider.close()
except WebsocketConnectionError:
# WebsocketConnectionError is raised when the connection is already closed
discard
provider = nil
# proc registerRpcMethods(behavior: WebSocketBehavior) = proc registerRpcMethods(response: WebSocketResponse) =
# mockWsServer.registerRpcBehavior("eth_accounts", behavior) mockWsServer.registerRpcResponse("eth_accounts", response)
# mockWsServer.registerRpcBehavior("eth_call", behavior) mockWsServer.registerRpcResponse("eth_call", response)
# mockWsServer.registerRpcBehavior("eth_sendTransaction", behavior) mockWsServer.registerRpcResponse("eth_sendTransaction", response)
# mockWsServer.registerRpcBehavior("eth_sendRawTransaction", behavior) mockWsServer.registerRpcResponse("eth_sendRawTransaction", response)
# mockWsServer.registerRpcBehavior("eth_newBlockFilter", behavior) mockWsServer.registerRpcResponse("eth_subscribe", response)
# mockWsServer.registerRpcBehavior("eth_newFilter", behavior)
# mockWsServer.registerRpcBehavior("eth_subscribe", behavior)
# proc testCustomBehavior(errorName: string, behavior: WebSocketBehavior, errorType: type CatchableError) = proc testCustomResponse(
# let testNamePrefix = errorName & " behavior is converted to " & errorType.name & " for " name: string,
errorType: type CatchableError,
response: WebSocketResponse,
before: Before = nil,
) =
test name & " when sending a manual RPC method request":
registerRpcMethods(response)
await runBefore(before)
expect errorType:
discard await provider.send("eth_accounts")
# test testNamePrefix & "sending a manual RPC method request": test name &
# registerRpcMethods(behavior) " when calling a provider method that converts errors when calling a generated RPC request":
# expect errorType: registerRpcMethods(response)
# discard await provider.send("eth_accounts") await runBefore(before)
expect errorType:
discard await provider.listAccounts()
# test testNamePrefix & "calling a provider method that converts errors": test name & " when calling a view method of a contract":
# registerRpcMethods(behavior) registerRpcMethods(response)
# expect errorType: await runBefore(before)
# discard await provider.listAccounts() expect errorType:
token = TestToken.new(token.address, provider.getSigner())
discard await token.balanceOf(Address.example)
# test testNamePrefix & "calling a view method of a contract": test name & " when calling a contract method that executes a transaction":
# registerRpcMethods(behavior) registerRpcMethods(response)
# expect errorType: await runBefore(before)
# discard await token.balanceOf(Address.example) expect errorType:
token = TestToken.new(token.address, provider.getSigner())
discard await token.mint(
Address.example,
100.u256,
TransactionOverrides(gasLimit: 100.u256.some, chainId: 1.u256.some),
)
# test testNamePrefix & "calling a contract method that executes a transaction": test name & " when sending a manual transaction":
# registerRpcMethods(behavior) registerRpcMethods(response)
# expect errorType: await runBefore(before)
# token = TestToken.new(token.address, provider.getSigner()) expect errorType:
# discard await token.mint( let tx = Transaction.example
# Address.example, 100.u256, discard await provider.getSigner().sendTransaction(tx)
# TransactionOverrides(gasLimit: 100.u256.some, chainId: 1.u256.some)
# )
# test testNamePrefix & "sending a manual transaction": test name & " when sending a raw transaction":
# registerRpcMethods(behavior) registerRpcMethods(response)
# expect errorType: await runBefore(before)
# let tx = Transaction.example expect errorType:
# discard await provider.getSigner().sendTransaction(tx) const pk_with_funds =
"0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"
let wallet = !Wallet.new(pk_with_funds)
let tx = Transaction(
to: wallet.address,
nonce: some 0.u256,
chainId: some 31337.u256,
gasPrice: some 1_000_000_000.u256,
gasLimit: some 21_000.u256,
)
let signedTx = await wallet.signTransaction(tx)
discard await provider.sendTransaction(signedTx)
# test testNamePrefix & "sending a raw transaction": test name & " when subscribing to blocks":
# registerRpcMethods(behavior) privateAccess(JsonRpcProvider)
# expect errorType: registerRpcMethods(response)
# const pk_with_funds = "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" await runBefore(before)
# let wallet = !Wallet.new(pk_with_funds) expect errorType:
# let tx = Transaction( let emptyHandler = proc(blckResult: ?!Block) =
# to: wallet.address, discard
# nonce: some 0.u256, discard await provider.subscribe(emptyHandler)
# chainId: some 31337.u256,
# gasPrice: some 1_000_000_000.u256,
# gasLimit: some 21_000.u256,
# )
# let signedTx = await wallet.signTransaction(tx)
# discard await provider.sendTransaction(signedTx)
# test testNamePrefix & "subscribing to blocks": test name & " when subscribing to logs":
# registerRpcMethods(behavior) registerRpcMethods(response)
# expect errorType: await runBefore(before)
# let emptyHandler = proc(blckResult: ?!Block) = discard expect errorType:
# discard await provider.subscribe(emptyHandler) let filter =
EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) =
discard
discard await provider.subscribe(filter, emptyHandler)
# test testNamePrefix & "subscribing to logs": test "should not raise error on normal connection and request":
# registerRpcMethods(behavior) mockWsServer.registerRpcResponse(
# expect errorType: "eth_accounts",
# let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) proc(ws: WSSession) {.async.} =
# let emptyHandler = proc(log: ?!Log) = discard let response =
# discard await provider.subscribe(filter, emptyHandler) ResponseTx(jsonrpc: "2.0", id: 1, result: % @["123"], kind: rkResult)
await ws.send(response.toJson)
,
)
# # WebSocket close codes equivalent to HTTP status codes let accounts = await provider.send("eth_accounts")
# testCustomBehavior( check @["123"] == !seq[string].fromJson(accounts)
# "Policy violation close (rate limit)",
# createBehavior(CloseWithCode, StatusPolicyError, "Policy violation - rate limited"),
# WebSocketPolicyError
# )
# testCustomBehavior( testCustomResponse(
# "Server error close", "should raise JsonRpcProviderError for a returned error response",
# createBehavior(CloseWithCode, StatusUnexpectedError, "Internal server error"), JsonRpcProviderError,
# JsonRpcProviderError proc(ws: WSSession) {.async.} =
# ) let response = ResponseTx(
jsonrpc: "2.0",
# # testCustomBehavior( id: 1,
# # "Service unavailable close", error: ResponseError(code: 1, message: "some error"),
# # createBehavior(CloseWithCode, StatusCodes.TryAgainLater, "Try again later"), kind: rkError,
# # WebSocketServiceUnavailableError )
# # ) await ws.send(response.toJson)
,
# testCustomBehavior( )
# "Abrupt disconnect", testCustomResponse(
# createBehavior(AbruptClose), "raises WebsocketConnectionError for closed connection",
# RpcNetworkError WebsocketConnectionError,
# ) proc(ws: WSSession) {.async.} =
# Simulate a closed connection
# test "raises RpcNetworkError when WebSocket connection times out": await ws.close(StatusGoingAway, "Going away")
# registerRpcMethods(createBehavior(Timeout, delay = 5.minutes)) ,
)
# # Set a short timeout on the WebSocket client testCustomResponse(
# privateAccess(JsonRpcProvider) "raises WebsocketConnectionError for failed connection",
# privateAccess(RpcWebSocketClient) WebsocketConnectionError,
response = proc(ws: WSSession) {.async.} =
# let rpcClient = await provider.client return ,
# let client = RpcWebSocketClient(rpcClient) before = proc() {.async.} =
# # Note: Actual timeout setting depends on nim-websock implementation # Used to simulate an HttpError, which is also raised for "Timeout expired
# # This may need to be adjusted based on available APIs # while receiving headers", however replicating that exact scenario would
# take 120s as the HttpHeadersTimeout is hardcoded to 120 seconds.
# expect RpcNetworkError: provider = JsonRpcProvider.new("ws://localhost:9999"),
# discard await provider.send("eth_accounts").wait(1.seconds) )
testCustomResponse(
# test "raises RpcNetworkError when WebSocket connection is closed unexpectedly": "raises JsonRpcProviderError for exceptions in onProcessMessage callback",
# # Start a request, then close the server JsonRpcProviderError,
# let sendFuture = provider.send("eth_accounts") response = proc(ws: WSSession) {.async.} =
# await sleepAsync(10.millis) let response = ResponseTx(jsonrpc: "2.0", id: 1, result: %"", kind: rkResult)
# await mockWsServer.stop() await ws.send(response.toJson)
,
# expect RpcNetworkError: before = proc() {.async.} =
# discard await sendFuture privateAccess(JsonRpcProvider)
let rpcClient = await provider.client
# test "raises RpcNetworkError when WebSocket connection fails to establish": rpcClient.onProcessMessage = proc(
# # Stop the server first client: RpcClient, line: string
# await mockWsServer.stop() ): Result[bool, string] {.gcsafe, raises: [].} =
return err "Some error",
# expect RpcNetworkError: )
# let deadProvider = JsonRpcProvider.new("ws://127.0.0.1:9999/ws")
# discard await deadProvider.send("eth_accounts")
# test "handles WebSocket protocol errors gracefully":
# registerRpcMethods(createBehavior(InvalidFrame))
# expect JsonRpcProviderError: # or whatever error nim-json-rpc maps protocol errors to
# discard await provider.send("eth_accounts")
# test "handles oversized WebSocket messages":
# registerRpcMethods(createBehavior(MessageTooBig))
# expect RpcNetworkError: # Large message handling depends on client limits
# discard await provider.send("eth_accounts")
# test "raises timeout error on slow WebSocket handshake":
# # Create a server that delays the WebSocket upgrade
# let slowServer = MockWebSocketServer.init(initTAddress("127.0.0.1:0"))
# # This would need custom implementation to delay handshake
# expect WebSocketTimeoutError:
# let slowProvider = JsonRpcProvider.new("ws://127.0.0.1:9998/ws")
# discard await slowProvider.send("eth_accounts").wait(100.millis)
# test "handles connection drops during message exchange":
# # Register normal behavior initially
# registerRpcMethods(createBehavior(Normal))
# # Start multiple requests
# let futures = @[
# provider.send("eth_accounts"),
# provider.send("eth_call"),
# provider.send("eth_newBlockFilter")
# ]
# # Close connections after a short delay
# await sleepAsync(5.millis)
# for conn in mockWsServer.connections:
# await conn.close(StatusCodes.AbnormalClosure, "Abnormal closure")
# # All should fail with network error
# for future in futures:
# expect RpcNetworkError:
# discard await future
# test "recovers from temporary WebSocket disconnections":
# # This test would verify client reconnection logic if implemented
# # Initial connection works
# registerRpcMethods(createBehavior(Normal))
# let result1 = await provider.send("eth_accounts")
# check result1.isOk
# # Simulate connection drop
# for conn in mockWsServer.connections:
# await conn.close(StatusCodes.GoingAway, "Going away")
# # Depending on provider implementation, this might auto-reconnect
# # or need manual reconnection
# expect RpcNetworkError:
# discard await provider.send("eth_accounts")
# test "handles WebSocket ping/pong timeouts":
# # This would test the ping/pong mechanism if the client supports it
# registerRpcMethods(createBehavior(Normal))
# # Mock a scenario where server doesn't respond to pings
# for conn in mockWsServer.connections:
# # Disable pong responses (if we had access to this)
# conn.onPing = nil
# # This test would need to trigger ping timeout
# # The exact implementation depends on the websocket client capabilities
# test "handles WebSocket close frame with invalid payload":
# # Test handling of malformed close frames
# registerRpcMethods(createBehavior(CloseWithCode, StatusCodes.ProtocolError, ""))
# expect JsonRpcProviderError:
# discard await provider.send("eth_accounts")