support subscribing to mempool for websocket

This commit is contained in:
aj3423 2025-09-11 16:32:39 +08:00
parent 30871c7b1d
commit 064a5c8956
No known key found for this signature in database
GPG Key ID: B2B6E286B6436F52
3 changed files with 45 additions and 3 deletions

View File

@ -58,6 +58,7 @@ type
transactionType* {.serialize("type"), deserialize("type").}: TransactionType
LogHandler* = proc(log: ?!Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].}
PendingTransactionHandler* = proc(txHash: ?!TransactionHash) {.gcsafe, raises:[].}
Topic* = array[32, byte]
Block* {.serialize.} = object
number*: ?UInt256

View File

@ -72,6 +72,14 @@ method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
{.async: (raises: [SubscriptionError, CancelledError]), base,.} =
raiseAssert "not implemented"
method subscribePendingTransactions*(
subscriptions: JsonRpcSubscriptions,
onTx: PendingTransactionHandler
): Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]), base,.}
=
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
filter: EventFilter,
onLog: LogHandler):
@ -183,6 +191,26 @@ method subscribeBlocks(subscriptions: WebSocketSubscriptions,
subscriptions.callbacks[id] = callback
return id
method subscribePendingTransactions*(
subscriptions: WebSocketSubscriptions,
onTx: PendingTransactionHandler
): Future[JsonNode]
{.async: (raises: [SubscriptionError, CancelledError]).} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} =
without arguments =? argumentsResult, error:
onTx(failure(TransactionHash, error.toErr(SubscriptionError)))
return
let res = TransactionHash.fromJson(arguments{"result"}).mapFailure(SubscriptionError)
onTx(res)
convertErrorsToSubscriptionError:
withLock(subscriptions):
let id = await subscriptions.client.eth_subscribe("newPendingTransactions")
subscriptions.callbacks[id] = callback
return id
method subscribeLogs(subscriptions: WebSocketSubscriptions,
filter: EventFilter,
onLog: LogHandler):

View File

@ -22,7 +22,7 @@ suite "JsonRpcSubscriptions":
let subscriptions = JsonRpcSubscriptions.new(client)
check not isNil subscriptions
template subscriptionTests(subscriptions, client) =
template subscriptionTests(subscriptions, client, tipe) =
test "subscribes to new blocks":
var latestBlock: Block
@ -72,6 +72,19 @@ template subscriptionTests(subscriptions, client) =
await sleepAsync(100.millis)
check count == 0
if tipe == "Websocket":
test "subscribes to pendingTransactions":
var count = 0
proc callback(tx: ?!TransactionHash) =
count += 1
let subscription = await subscriptions.subscribePendingTransactions(callback)
discard await client.call("eth_sendTransaction",
"""[{"to": "0x0000000000000000000000000000000000000000", "value": "0x0"}]""".parseJson)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
await subscriptions.unsubscribe(subscription)
suite "Web socket subscriptions":
var subscriptions: JsonRpcSubscriptions
@ -87,7 +100,7 @@ suite "Web socket subscriptions":
await subscriptions.close()
await client.close()
subscriptionTests(subscriptions, client)
subscriptionTests(subscriptions, client, "Websocket")
suite "HTTP polling subscriptions":
@ -105,7 +118,7 @@ suite "HTTP polling subscriptions":
await subscriptions.close()
await client.close()
subscriptionTests(subscriptions, client)
subscriptionTests(subscriptions, client, "HttpPolling")
suite "HTTP polling subscriptions - mock tests":