From 064a5c895652a7f0d4a326a8bacd8af9a3ebd822 Mon Sep 17 00:00:00 2001 From: aj3423 Date: Thu, 11 Sep 2025 16:32:39 +0800 Subject: [PATCH] support subscribing to mempool for websocket --- ethers/provider.nim | 1 + ethers/providers/jsonrpc/subscriptions.nim | 28 +++++++++++++++++++ .../jsonrpc/testJsonRpcSubscriptions.nim | 19 +++++++++++-- 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/ethers/provider.nim b/ethers/provider.nim index 55cc07c..03f881a 100644 --- a/ethers/provider.nim +++ b/ethers/provider.nim @@ -58,6 +58,7 @@ type transactionType* {.serialize("type"), deserialize("type").}: TransactionType LogHandler* = proc(log: ?!Log) {.gcsafe, raises:[].} BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].} + PendingTransactionHandler* = proc(txHash: ?!TransactionHash) {.gcsafe, raises:[].} Topic* = array[32, byte] Block* {.serialize.} = object number*: ?UInt256 diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index 95c094e..0f7f79d 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -72,6 +72,14 @@ method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, {.async: (raises: [SubscriptionError, CancelledError]), base,.} = raiseAssert "not implemented" +method subscribePendingTransactions*( + subscriptions: JsonRpcSubscriptions, + onTx: PendingTransactionHandler +): Future[JsonNode] + {.async: (raises: [SubscriptionError, CancelledError]), base,.} + = + raiseAssert "not implemented" + method subscribeLogs*(subscriptions: JsonRpcSubscriptions, filter: EventFilter, onLog: LogHandler): @@ -183,6 +191,26 @@ method subscribeBlocks(subscriptions: WebSocketSubscriptions, subscriptions.callbacks[id] = callback return id +method subscribePendingTransactions*( + subscriptions: WebSocketSubscriptions, + onTx: PendingTransactionHandler +): Future[JsonNode] + {.async: (raises: [SubscriptionError, CancelledError]).} = + + proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} = + without arguments =? argumentsResult, error: + onTx(failure(TransactionHash, error.toErr(SubscriptionError))) + return + + let res = TransactionHash.fromJson(arguments{"result"}).mapFailure(SubscriptionError) + onTx(res) + + convertErrorsToSubscriptionError: + withLock(subscriptions): + let id = await subscriptions.client.eth_subscribe("newPendingTransactions") + subscriptions.callbacks[id] = callback + return id + method subscribeLogs(subscriptions: WebSocketSubscriptions, filter: EventFilter, onLog: LogHandler): diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index 9417cfa..bebaa80 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -22,7 +22,7 @@ suite "JsonRpcSubscriptions": let subscriptions = JsonRpcSubscriptions.new(client) check not isNil subscriptions -template subscriptionTests(subscriptions, client) = +template subscriptionTests(subscriptions, client, tipe) = test "subscribes to new blocks": var latestBlock: Block @@ -72,6 +72,19 @@ template subscriptionTests(subscriptions, client) = await sleepAsync(100.millis) check count == 0 + if tipe == "Websocket": + test "subscribes to pendingTransactions": + var count = 0 + proc callback(tx: ?!TransactionHash) = + count += 1 + + let subscription = await subscriptions.subscribePendingTransactions(callback) + discard await client.call("eth_sendTransaction", + """[{"to": "0x0000000000000000000000000000000000000000", "value": "0x0"}]""".parseJson) + discard await client.call("evm_mine", newJArray()) + check eventually count > 0 + await subscriptions.unsubscribe(subscription) + suite "Web socket subscriptions": var subscriptions: JsonRpcSubscriptions @@ -87,7 +100,7 @@ suite "Web socket subscriptions": await subscriptions.close() await client.close() - subscriptionTests(subscriptions, client) + subscriptionTests(subscriptions, client, "Websocket") suite "HTTP polling subscriptions": @@ -105,7 +118,7 @@ suite "HTTP polling subscriptions": await subscriptions.close() await client.close() - subscriptionTests(subscriptions, client) + subscriptionTests(subscriptions, client, "HttpPolling") suite "HTTP polling subscriptions - mock tests":