feat: subscriptions handlers get results

This commit is contained in:
Adam Uhlíř 2024-10-25 12:28:40 +02:00
parent 1c2610a583
commit 51f1f805ea
No known key found for this signature in database
GPG Key ID: 1D17A9E81F76155B
5 changed files with 96 additions and 42 deletions

View File

@ -3,6 +3,7 @@ import std/macros
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/contractabi
import ./basics
import ./provider
@ -35,11 +36,10 @@ type
gasLimit*: ?UInt256
CallOverrides* = ref object of TransactionOverrides
blockTag*: ?BlockTag
ContractError* = object of EthersError
Confirmable* = object
response*: ?TransactionResponse
convert*: ConvertCustomErrors
EventHandler*[E: Event] = proc(event: E) {.gcsafe, raises:[].}
EventHandler*[E: Event] = proc(event: ?!E) {.gcsafe, raises:[].}
func new*(ContractType: type Contract,
address: Address,
@ -292,9 +292,13 @@ proc subscribe*[E: Event](contract: Contract,
let topic = topic($E, E.fieldTypes).toArray
let filter = EventFilter(address: contract.address, topics: @[topic])
proc logHandler(log: Log) {.raises: [].} =
proc logHandler(logResult: ?!Log) {.raises: [].} =
without log ?= logResult, err:
handler(failure(e))
return
if event =? E.decode(log.data, log.topics):
handler(event)
handler(success(event))
contract.provider.subscribe(filter, logHandler)

View File

@ -1,6 +1,16 @@
import ./basics
type SolidityError* = object of EthersError
type
SolidityError* = object of EthersError
ContractError* = object of EthersError
SignerError* = object of EthersError
SubscriptionError* = object of EthersError
SubscriptionResult*[E] = Result[E, ref SubscriptionError]
ProviderError* = object of EthersError
data*: ?seq[byte]
template raiseSignerError*(message: string, parent: ref ProviderError = nil) =
raise newException(SignerError, message, parent)
{.push raises:[].}

View File

@ -1,5 +1,6 @@
import pkg/chronicles
import pkg/serde
import pkg/questionable
import ./basics
import ./transaction
import ./blocktag
@ -13,8 +14,6 @@ export blocktag
type
Provider* = ref object of RootObj
ProviderError* = object of EthersError
data*: ?seq[byte]
EstimateGasError* = object of ProviderError
transaction*: Transaction
Subscription* = ref object of RootObj
@ -56,8 +55,8 @@ type
effectiveGasPrice*: ?UInt256
status*: TransactionStatus
transactionType* {.serialize("type"), deserialize("type").}: TransactionType
LogHandler* = proc(log: Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: Block) {.gcsafe, raises:[].}
LogHandler* = proc(log: SubscriptionResult[Log]) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: SubscriptionResult[Block]) {.gcsafe, raises:[].}
Topic* = array[32, byte]
Block* {.serialize.} = object
number*: ?UInt256
@ -227,7 +226,7 @@ proc confirm*(
tx: TransactionResponse,
confirmations = EthersDefaultConfirmations,
timeout = EthersReceiptTimeoutBlks): Future[TransactionReceipt]
{.async: (raises: [CancelledError, ProviderError, EthersError]).} =
{.async: (raises: [CancelledError, ProviderError, SubscriptionError, EthersError]).} =
## Waits for a transaction to be mined and for the specified number of blocks
## to pass since it was mined (confirmations).
@ -235,13 +234,20 @@ proc confirm*(
## blocks have passed without the tx having been mined.
var blockNumber: UInt256
var blockSubscriptionError: ref SubscriptionError
let blockEvent = newAsyncEvent()
proc onBlockNumber(number: UInt256) =
blockNumber = number
blockEvent.fire()
proc onBlock(blck: Block) =
proc onBlock(blckResult: SubscriptionResult[Block]) =
if blckResult.isErr:
blockSubscriptionError = blckResult.error()
blockEvent.fire()
let blck = blckResult.value
if number =? blck.number:
onBlockNumber(number)
@ -255,6 +261,9 @@ proc confirm*(
await blockEvent.wait()
blockEvent.clear()
if not isNil(blockSubscriptionError):
raise blockSubscriptionError
if blockNumber >= finish:
await subscription.unsubscribe()
raise newException(EthersError, "tx not mined before timeout")

View File

@ -2,8 +2,10 @@ import std/tables
import std/sequtils
import std/strutils
import pkg/chronos
import pkg/questionable
import pkg/json_rpc/rpcclient
import ../../basics
import ../../errors
import ../../provider
include ../../nimshims/hashes
import ./rpccalls
@ -16,8 +18,7 @@ type
callbacks: Table[JsonNode, SubscriptionCallback]
methodHandlers: Table[string, MethodHandler]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, raises:[].}
SubscriptionError* = object of EthersError
SubscriptionCallback = proc(id, arguments: SubscriptionResult[JsonNode]) {.gcsafe, raises:[CancelledError].}
{.push raises:[].}
@ -53,7 +54,7 @@ proc setMethodHandler(
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async, base.} =
{.async, base, raises: [CancelledError].} =
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
@ -74,14 +75,16 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
await subscriptions.unsubscribe(id)
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback =
id: JsonNode): ?SubscriptionCallback {. raises:[].} =
try:
if not id.isNil and id in subscriptions.callbacks:
subscriptions.callbacks[id].some
try:
return subscriptions.callbacks[id].some
except: discard
else:
SubscriptionCallback.none
return SubscriptionCallback.none
except KeyError:
SubscriptionCallback.none
return SubscriptionCallback.none
# Web sockets
@ -95,17 +98,22 @@ proc new*(_: type JsonRpcSubscriptions,
proc subscriptionHandler(arguments: JsonNode) {.raises:[].} =
let id = arguments{"subscription"} or newJString("")
if callback =? subscriptions.getCallback(id):
callback(id, arguments)
callback(id, success(arguments))
subscriptions.setMethodHandler("eth_subscription", subscriptionHandler)
subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) {.raises: [].} =
{.async, raises: [].} =
proc callback(id, argumentsResult: SubscriptionResult[JsonNode]) {.raises: [].} =
if argumentsResult.isErr:
onBlock(SubscriptionResult[Block].err(argumentsResult.error))
return
let arguments = argumentsResult.value
if blck =? Block.fromJson(arguments{"result"}):
onBlock(blck)
onBlock(SubscriptionResult[Block].ok(blck))
let id = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[id] = callback
return id
@ -115,9 +123,15 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions,
onLog: LogHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) =
proc callback(id, argumentsResult: SubscriptionResult[JsonNode]) =
if argumentsResult.isErr:
onLog(SubscriptionResult[Log].err(argumentsResult.error))
return
let arguments = argumentsResult.value
if log =? Log.fromJson(arguments{"result"}):
onLog(log)
onLog(SubscriptionResult[Log].ok(log))
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
return id
@ -149,7 +163,7 @@ proc new*(_: type JsonRpcSubscriptions,
let subscriptions = PollingSubscriptions(client: client)
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} =
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async, raises:[CancelledError, SubscriptionError].} =
try:
let mappedId = subscriptions.subscriptionMapping[originalId]
let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
@ -175,12 +189,19 @@ proc new*(_: type JsonRpcSubscriptions,
subscriptions.subscriptionMapping[originalId] = newId
return await getChanges(originalId)
else:
raise e
raise newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg, e)
proc poll(id: JsonNode) {.async.} =
for change in await getChanges(id):
if callback =? subscriptions.getCallback(id):
callback(id, change)
proc poll(id: JsonNode) {.async, raises: [CancelledError, SubscriptionError].} =
without callback =? subscriptions.getCallback(id):
return
try:
for change in await getChanges(id):
callback(id, success(change))
except CancelledError as e:
raise e
except CatchableError as e:
callback(id, failure(e))
proc poll {.async.} =
untilCancelled:
@ -198,16 +219,24 @@ method close*(subscriptions: PollingSubscriptions) {.async.} =
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
{.async, raises:[CancelledError].} =
proc getBlock(hash: BlockHash) {.async.} =
proc getBlock(hash: BlockHash) {.async, raises:[CancelledError].} =
try:
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
onBlock(blck)
except CatchableError:
discard
onBlock(SubscriptionResult[Block].ok(blck))
except CancelledError as e:
raise e
except CatchableError as e:
let wrappedErr = newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg, e)
onBlock(SubscriptionResult[Block].err(wrappedErr))
proc callback(id, change: JsonNode) =
proc callback(id, changeResult: SubscriptionResult[JsonNode]) {.raises:[CancelledError].} =
if changeResult.isErr:
onBlock(SubscriptionResult[Block].err(changeResult.error))
return
let change = changeResult.value
if hash =? BlockHash.fromJson(change):
asyncSpawn getBlock(hash)
@ -222,9 +251,14 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
Future[JsonNode]
{.async.} =
proc callback(id, change: JsonNode) =
proc callback(id, changeResult: SubscriptionResult[JsonNode]) =
if changeResult.isErr:
onLog(SubscriptionResult[Log].err(changeResult.error))
return
let change = changeResult.value
if log =? Log.fromJson(change):
onLog(log)
onLog(SubscriptionResult[Log].ok(log))
let id = await subscriptions.client.eth_newFilter(filter)
subscriptions.callbacks[id] = callback

View File

@ -1,5 +1,6 @@
import pkg/questionable
import ./basics
import ./errors
import ./provider
export basics
@ -9,10 +10,6 @@ export basics
type
Signer* = ref object of RootObj
populateLock: AsyncLock
SignerError* = object of EthersError
template raiseSignerError(message: string, parent: ref ProviderError = nil) =
raise newException(SignerError, message, parent)
template convertError(body) =
try: