refactor(subscriptions): new implementation of subscriptions

This commit is contained in:
Mark Spanbroek 2025-09-09 16:10:26 +02:00
parent c2d2909936
commit 017245826f
3 changed files with 204 additions and 0 deletions

127
ethers/subscriptions.nim Normal file
View File

@ -0,0 +1,127 @@
import std/tables
import std/sequtils
import ./basics
import ./provider
import ./subscriptions/blocksubscriber
import ./subscriptions/logsbloom
type
Subscriptions* = ref object
provider: Provider
blockSubscriber: BlockSubscriber
blockSubscriptions: Table[SubscriptionId, BlockHandler]
logSubscriptions: Table[SubscriptionId, (EventFilter, LogHandler)]
nextSubscriptionId: int
LocalSubscription* = ref object of Subscription
subscriptions: Subscriptions
id: SubscriptionId
SubscriptionId = int
func len*(subscriptions: Subscriptions): int =
subscriptions.blockSubscriptions.len + subscriptions.logSubscriptions.len
proc subscribe*(
subscriptions: Subscriptions,
onBlock: BlockHandler
): Future[Subscription] {.async:(raises:[ProviderError, CancelledError]).} =
let id = subscriptions.nextSubscriptionId
inc subscriptions.nextSubscriptionId
subscriptions.blockSubscriptions[id] = onBlock
await subscriptions.blockSubscriber.start()
LocalSubscription(subscriptions: subscriptions, id: id)
proc subscribe*(
subscriptions: Subscriptions,
filter: EventFilter,
onLog: LogHandler
): Future[Subscription] {.async:(raises:[ProviderError, CancelledError]).} =
let id = subscriptions.nextSubscriptionId
inc subscriptions.nextSubscriptionId
subscriptions.logSubscriptions[id] = (filter, onLog)
await subscriptions.blockSubscriber.start()
LocalSubscription(subscriptions: subscriptions, id: id)
method unsubscribe*(
subscription: LocalSubscription
) {.async:(raises:[ProviderError, CancelledError]).} =
let subscriptions = subscription.subscriptions
let id = subscription.id
subscriptions.logSubscriptions.del(id)
subscriptions.blockSubscriptions.del(id)
if subscriptions.len == 0:
await subscriptions.blockSubscriber.stop()
proc getLogs(
subscriptions: Subscriptions,
filter: EventFilter,
blockTag: BlockTag
): Future[seq[Log]] {.async:(raises:[ProviderError, CancelledError]).} =
let logFilter = Filter()
logFilter.address = filter.address
logFilter.topics = filter.topics
logFilter.fromBlock = blockTag
logFilter.toBlock = blockTag
await subscriptions.provider.getLogs(logFilter)
proc getLogs(
subscriptions: Subscriptions,
blck: Block
): Future[Table[SubscriptionId, seq[Log]]] {.
async:(raises:[ProviderError, CancelledError])
.} =
without blockNumber =? blck.number:
return
let blockTag = BlockTag.init(blockNumber)
let ids = toSeq(subscriptions.logSubscriptions.keys)
for id in ids:
without (filter, _) =? subscriptions.logSubscriptions.?[id]:
continue
if filter notin blck:
continue
result[id] = await subscriptions.getLogs(filter, blockTag)
proc processBlock(
subscriptions: Subscriptions,
blockNumber: BlockNumber
): Future[bool] {.async:(raises:[CancelledError]).} =
try:
let blockTag = BlockTag.init(blockNumber)
without blck =? await subscriptions.provider.getBlock(blockTag):
return false
if blck.logsBloom.isNone:
return false
let logs = await subscriptions.getLogs(blck)
for handler in subscriptions.blockSubscriptions.values:
handler(success blck)
for (id, logs) in logs.pairs:
if (_, handler) =? subscriptions.logSubscriptions.?[id]:
for log in logs:
handler(success log)
return true
except ProviderError:
return false
func new*(
_: type Subscriptions,
provider: Provider,
pollingInterval: Duration
): Subscriptions =
let subscriptions = Subscriptions()
proc processBlock(
blockNumber: BlockNumber
): Future[bool] {.async:(raises:[CancelledError]).} =
await subscriptions.processBlock(blockNumber)
let blockSubscriber = BlockSubscriber.new(
provider,
processBlock,
pollingInterval
)
subscriptions.provider = provider
subscriptions.blockSubscriber = blockSubscriber
subscriptions
proc close*(subscriptions: Subscriptions) {.async:(raises:[]).} =
await subscriptions.blockSubscriber.stop()
proc update*(subscriptions: Subscriptions) =
subscriptions.blockSubscriber.update()

View File

@ -0,0 +1,63 @@
import ../basics
import ../provider
type
BlockSubscriber* = ref object
provider: Provider
processor: ProcessBlock
pollingInterval: Duration
lastSeen: BlockNumber
lastProcessed: BlockNumber
wake: AsyncEvent
looping: Future[void].Raising([])
ProcessBlock* =
proc(number: BlockNumber): Future[bool] {.async:(raises:[CancelledError]).}
func new*(
_: type BlockSubscriber,
provider: Provider,
processor: ProcessBlock,
pollingInterval: Duration
): BlockSubscriber =
BlockSubscriber(
provider: provider,
processor: processor,
pollingInterval: pollingInterval
)
proc sleep(subscriber: BlockSubscriber) {.async:(raises:[CancelledError]).} =
discard await subscriber.wake.wait().withTimeout(subscriber.pollingInterval)
subscriber.wake.clear()
proc loop(subscriber: BlockSubscriber) {.async:(raises:[]).} =
try:
while true:
try:
await subscriber.sleep()
subscriber.lastSeen = await subscriber.provider.getBlockNumber()
for number in (subscriber.lastProcessed + 1)..subscriber.lastSeen:
if await subscriber.processor(number):
subscriber.lastProcessed = number
else:
break
except ProviderError:
discard
except CancelledError:
discard
proc start*(
subscriber: BlockSubscriber
) {.async:(raises:[ProviderError, CancelledError]).} =
if subscriber.looping.isNil:
subscriber.lastSeen = await subscriber.provider.getBlockNumber()
subscriber.lastProcessed = subscriber.lastSeen
subscriber.wake = newAsyncEvent()
subscriber.looping = subscriber.loop()
proc stop*(subscriber: BlockSubscriber) {.async:(raises:[]).} =
if looping =? subscriber.looping:
subscriber.looping = nil
await looping.cancelAndWait()
proc update*(subscriber: BlockSubscriber) =
subscriber.wake.fire()

View File

@ -0,0 +1,14 @@
import pkg/eth/bloom
import ../basics
import ../provider
func contains*(blck: Block, filter: EventFilter): bool =
without logsBloom =? blck.logsBloom:
return false
let bloomFilter = BloomFilter(value: logsBloom)
if filter.address.toArray notin bloomFilter:
return false
for topic in filter.topics:
if topic notin bloomFilter:
return false
return true