diff --git a/ethers/subscriptions.nim b/ethers/subscriptions.nim new file mode 100644 index 0000000..b71001e --- /dev/null +++ b/ethers/subscriptions.nim @@ -0,0 +1,127 @@ +import std/tables +import std/sequtils +import ./basics +import ./provider +import ./subscriptions/blocksubscriber +import ./subscriptions/logsbloom + +type + Subscriptions* = ref object + provider: Provider + blockSubscriber: BlockSubscriber + blockSubscriptions: Table[SubscriptionId, BlockHandler] + logSubscriptions: Table[SubscriptionId, (EventFilter, LogHandler)] + nextSubscriptionId: int + LocalSubscription* = ref object of Subscription + subscriptions: Subscriptions + id: SubscriptionId + SubscriptionId = int + +func len*(subscriptions: Subscriptions): int = + subscriptions.blockSubscriptions.len + subscriptions.logSubscriptions.len + +proc subscribe*( + subscriptions: Subscriptions, + onBlock: BlockHandler +): Future[Subscription] {.async:(raises:[ProviderError, CancelledError]).} = + let id = subscriptions.nextSubscriptionId + inc subscriptions.nextSubscriptionId + subscriptions.blockSubscriptions[id] = onBlock + await subscriptions.blockSubscriber.start() + LocalSubscription(subscriptions: subscriptions, id: id) + +proc subscribe*( + subscriptions: Subscriptions, + filter: EventFilter, + onLog: LogHandler +): Future[Subscription] {.async:(raises:[ProviderError, CancelledError]).} = + let id = subscriptions.nextSubscriptionId + inc subscriptions.nextSubscriptionId + subscriptions.logSubscriptions[id] = (filter, onLog) + await subscriptions.blockSubscriber.start() + LocalSubscription(subscriptions: subscriptions, id: id) + +method unsubscribe*( + subscription: LocalSubscription +) {.async:(raises:[ProviderError, CancelledError]).} = + let subscriptions = subscription.subscriptions + let id = subscription.id + subscriptions.logSubscriptions.del(id) + subscriptions.blockSubscriptions.del(id) + if subscriptions.len == 0: + await subscriptions.blockSubscriber.stop() + +proc getLogs( + subscriptions: Subscriptions, + filter: EventFilter, + blockTag: BlockTag +): Future[seq[Log]] {.async:(raises:[ProviderError, CancelledError]).} = + let logFilter = Filter() + logFilter.address = filter.address + logFilter.topics = filter.topics + logFilter.fromBlock = blockTag + logFilter.toBlock = blockTag + await subscriptions.provider.getLogs(logFilter) + +proc getLogs( + subscriptions: Subscriptions, + blck: Block +): Future[Table[SubscriptionId, seq[Log]]] {. + async:(raises:[ProviderError, CancelledError]) +.} = + without blockNumber =? blck.number: + return + let blockTag = BlockTag.init(blockNumber) + let ids = toSeq(subscriptions.logSubscriptions.keys) + for id in ids: + without (filter, _) =? subscriptions.logSubscriptions.?[id]: + continue + if filter notin blck: + continue + result[id] = await subscriptions.getLogs(filter, blockTag) + +proc processBlock( + subscriptions: Subscriptions, + blockNumber: BlockNumber +): Future[bool] {.async:(raises:[CancelledError]).} = + try: + let blockTag = BlockTag.init(blockNumber) + without blck =? await subscriptions.provider.getBlock(blockTag): + return false + if blck.logsBloom.isNone: + return false + let logs = await subscriptions.getLogs(blck) + for handler in subscriptions.blockSubscriptions.values: + handler(success blck) + for (id, logs) in logs.pairs: + if (_, handler) =? subscriptions.logSubscriptions.?[id]: + for log in logs: + handler(success log) + return true + except ProviderError: + return false + +func new*( + _: type Subscriptions, + provider: Provider, + pollingInterval: Duration +): Subscriptions = + let subscriptions = Subscriptions() + proc processBlock( + blockNumber: BlockNumber + ): Future[bool] {.async:(raises:[CancelledError]).} = + await subscriptions.processBlock(blockNumber) + let blockSubscriber = BlockSubscriber.new( + provider, + processBlock, + pollingInterval + ) + subscriptions.provider = provider + subscriptions.blockSubscriber = blockSubscriber + subscriptions + +proc close*(subscriptions: Subscriptions) {.async:(raises:[]).} = + await subscriptions.blockSubscriber.stop() + +proc update*(subscriptions: Subscriptions) = + subscriptions.blockSubscriber.update() diff --git a/ethers/subscriptions/blocksubscriber.nim b/ethers/subscriptions/blocksubscriber.nim new file mode 100644 index 0000000..305cd93 --- /dev/null +++ b/ethers/subscriptions/blocksubscriber.nim @@ -0,0 +1,63 @@ +import ../basics +import ../provider + +type + BlockSubscriber* = ref object + provider: Provider + processor: ProcessBlock + pollingInterval: Duration + lastSeen: BlockNumber + lastProcessed: BlockNumber + wake: AsyncEvent + looping: Future[void].Raising([]) + ProcessBlock* = + proc(number: BlockNumber): Future[bool] {.async:(raises:[CancelledError]).} + +func new*( + _: type BlockSubscriber, + provider: Provider, + processor: ProcessBlock, + pollingInterval: Duration +): BlockSubscriber = + BlockSubscriber( + provider: provider, + processor: processor, + pollingInterval: pollingInterval + ) + +proc sleep(subscriber: BlockSubscriber) {.async:(raises:[CancelledError]).} = + discard await subscriber.wake.wait().withTimeout(subscriber.pollingInterval) + subscriber.wake.clear() + +proc loop(subscriber: BlockSubscriber) {.async:(raises:[]).} = + try: + while true: + try: + await subscriber.sleep() + subscriber.lastSeen = await subscriber.provider.getBlockNumber() + for number in (subscriber.lastProcessed + 1)..subscriber.lastSeen: + if await subscriber.processor(number): + subscriber.lastProcessed = number + else: + break + except ProviderError: + discard + except CancelledError: + discard + +proc start*( + subscriber: BlockSubscriber +) {.async:(raises:[ProviderError, CancelledError]).} = + if subscriber.looping.isNil: + subscriber.lastSeen = await subscriber.provider.getBlockNumber() + subscriber.lastProcessed = subscriber.lastSeen + subscriber.wake = newAsyncEvent() + subscriber.looping = subscriber.loop() + +proc stop*(subscriber: BlockSubscriber) {.async:(raises:[]).} = + if looping =? subscriber.looping: + subscriber.looping = nil + await looping.cancelAndWait() + +proc update*(subscriber: BlockSubscriber) = + subscriber.wake.fire() diff --git a/ethers/subscriptions/logsbloom.nim b/ethers/subscriptions/logsbloom.nim new file mode 100644 index 0000000..7ecb32b --- /dev/null +++ b/ethers/subscriptions/logsbloom.nim @@ -0,0 +1,14 @@ +import pkg/eth/bloom +import ../basics +import ../provider + +func contains*(blck: Block, filter: EventFilter): bool = + without logsBloom =? blck.logsBloom: + return false + let bloomFilter = BloomFilter(value: logsBloom) + if filter.address.toArray notin bloomFilter: + return false + for topic in filter.topics: + if topic notin bloomFilter: + return false + return true