diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 3147d59..a5c4774 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -25,9 +25,22 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - uses: iffy/install-nim@v4 with: version: ${{ matrix.nim }} + + - name: Update nimble + run: | + nimble install nimble + nimble --version + + - name: Use updated nimble version on Windows + if: contains(matrix.os, 'windows') + run: | + del $HOME\.nimble\bin\nimble.exe + nimble --version + - name: Build run: nimble build -y - name: Test diff --git a/codexcrawler.nimble b/codexcrawler.nimble index a38d166..3af92dc 100644 --- a/codexcrawler.nimble +++ b/codexcrawler.nimble @@ -25,6 +25,7 @@ requires "questionable >= 0.10.15 & < 0.11.0" requires "https://github.com/codex-storage/nim-codex-dht#f6eef1ac95c70053b2518f1e3909c909ed8701a6" requires "docopt >= 0.7.1 & < 1.0.0" requires "nph >= 0.6.1 & < 1.0.0" +requires "ethers >= 1.0.0 & < 2.0.0" task format, "Formatting...": exec "nph ./" diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index 0f7f73c..e5a0d07 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -36,9 +36,15 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} = return failure(err) app.components = components + for c in components: + if err =? (await c.awake()).errorOption: + error "Failed during component awake", err = err.msg + return failure(err) + for c in components: if err =? (await c.start()).errorOption: - error "Failed to start component", err = err.msg + error "Failed during component start", err = err.msg + return failure(err) return success() diff --git a/codexcrawler/component.nim b/codexcrawler/component.nim index f36b0a2..ce1517b 100644 --- a/codexcrawler/component.nim +++ b/codexcrawler/component.nim @@ -1,12 +1,19 @@ import pkg/chronos import pkg/questionable/results -import ./state - type Component* = ref object of RootObj +method awake*(c: Component): Future[?!void] {.async, base.} = + # Awake is called on all components in an unspecified order. + # Use this method to subscribe/connect to other components. + return success() + method start*(c: Component): Future[?!void] {.async, base.} = - raiseAssert("call to abstract method: component.start") + # Start is called on all components in an unspecified order. + # Is is guaranteed that all components have already successfulled handled 'awake'. + # Use this method to begin the work of this component. + return success() method stop*(c: Component): Future[?!void] {.async, base.} = - raiseAssert("call to abstract method: component.stop") + # Use this method to stop, unsubscribe, and clean up any resources. + return success() diff --git a/codexcrawler/components/chaincrawler.nim b/codexcrawler/components/chaincrawler.nim new file mode 100644 index 0000000..f957630 --- /dev/null +++ b/codexcrawler/components/chaincrawler.nim @@ -0,0 +1,41 @@ +import pkg/chronicles +import pkg/chronos +import pkg/questionable/results + +import ../state +import ../components/requeststore +import ../services/marketplace +import ../component +import ../types + +logScope: + topics = "chaincrawler" + +type ChainCrawler* = ref object of Component + state: State + store: RequestStore + marketplace: MarketplaceService + +proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} = + return await c.store.add(rid) + +method start*(c: ChainCrawler): Future[?!void] {.async.} = + info "starting..." + + proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} = + return await c.onNewRequest(rid) + + # Normally subscriptions must be done in awake. + # Marketplace is a little different: It uses awake to set up its connections. + # And so it can't handle subscribes until we're in 'start'. + ?await c.marketplace.subscribeToNewRequests(onRequest) + ?await c.marketplace.iteratePastNewRequestEvents(onRequest) + return success() + +proc new*( + T: type ChainCrawler, + state: State, + store: RequestStore, + marketplace: MarketplaceService, +): ChainCrawler = + ChainCrawler(state: state, store: store, marketplace: marketplace) diff --git a/codexcrawler/components/chainmetrics.nim b/codexcrawler/components/chainmetrics.nim new file mode 100644 index 0000000..dac810d --- /dev/null +++ b/codexcrawler/components/chainmetrics.nim @@ -0,0 +1,81 @@ +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ../state +import ../services/metrics +import ../services/marketplace +import ../components/requeststore +import ../component +import ../types + +logScope: + topics = "chainmetrics" + +type + ChainMetrics* = ref object of Component + state: State + metrics: Metrics + store: RequestStore + marketplace: MarketplaceService + + Update = ref object + numRequests: int + numPending: int + numSlots: int + totalSize: int64 + +proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} = + var update = Update(numRequests: 0, numPending: 0, numSlots: 0, totalSize: 0) + + proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} = + let response = await c.marketplace.getRequestInfo(entry.id) + if info =? response: + if info.pending: + trace "request is pending", id = $entry.id + inc update.numPending + else: + trace "request is running", id = $entry.id + inc update.numRequests + update.numSlots += info.slots.int + update.totalSize += (info.slots * info.slotSize).int64 + else: + ?await c.store.remove(entry.id) + return success() + + ?await c.store.iterateAll(onRequest) + return success(update) + +proc updateMetrics(c: ChainMetrics, update: Update) = + c.metrics.setRequests(update.numRequests) + c.metrics.setPendingRequests(update.numPending) + c.metrics.setRequestSlots(update.numSlots) + c.metrics.setTotalSize(update.totalSize) + +proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} = + without update =? (await c.collectUpdate()), err: + return failure(err) + + c.updateMetrics(update) + return success() + +method start*(c: ChainMetrics): Future[?!void] {.async.} = + info "starting..." + + proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = + return await c.step() + + if c.state.config.marketplaceEnable: + await c.state.whileRunning(onStep, c.state.config.requestCheckDelay.minutes) + + return success() + +proc new*( + T: type ChainMetrics, + state: State, + metrics: Metrics, + store: RequestStore, + marketplace: MarketplaceService, +): ChainMetrics = + ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace) diff --git a/codexcrawler/components/crawler.nim b/codexcrawler/components/dhtcrawler.nim similarity index 70% rename from codexcrawler/components/crawler.nim rename to codexcrawler/components/dhtcrawler.nim index c32f5ce..0c594b2 100644 --- a/codexcrawler/components/crawler.nim +++ b/codexcrawler/components/dhtcrawler.nim @@ -12,15 +12,15 @@ import ../state import ../utils/asyncdataevent logScope: - topics = "crawler" + topics = "dhtcrawler" -type Crawler* = ref object of Component +type DhtCrawler* = ref object of Component state: State dht: Dht todo: TodoList proc raiseCheckEvent( - c: Crawler, nid: Nid, success: bool + c: DhtCrawler, nid: Nid, success: bool ): Future[?!void] {.async: (raises: []).} = let event = DhtNodeCheckEventData(id: nid, isOk: success) if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption: @@ -28,7 +28,7 @@ proc raiseCheckEvent( return failure(err) return success() -proc step(c: Crawler): Future[?!void] {.async: (raises: []).} = +proc step(c: DhtCrawler): Future[?!void] {.async: (raises: []).} = without nid =? (await c.todo.pop()), err: error "failed to pop todolist", err = err.msg return failure(err) @@ -46,18 +46,16 @@ proc step(c: Crawler): Future[?!void] {.async: (raises: []).} = return success() -method start*(c: Crawler): Future[?!void] {.async.} = - info "Starting crawler..." +method start*(c: DhtCrawler): Future[?!void] {.async.} = + info "starting..." proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = await c.step() - await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) + if c.state.config.dhtEnable: + await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) return success() -method stop*(c: Crawler): Future[?!void] {.async.} = - return success() - -proc new*(T: type Crawler, state: State, dht: Dht, todo: TodoList): Crawler = - Crawler(state: state, dht: dht, todo: todo) +proc new*(T: type DhtCrawler, state: State, dht: Dht, todo: TodoList): DhtCrawler = + DhtCrawler(state: state, dht: dht, todo: todo) diff --git a/codexcrawler/components/dhtmetrics.nim b/codexcrawler/components/dhtmetrics.nim index af9c834..374e169 100644 --- a/codexcrawler/components/dhtmetrics.nim +++ b/codexcrawler/components/dhtmetrics.nim @@ -45,11 +45,7 @@ proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.} d.updateMetrics() return success() -method start*(d: DhtMetrics): Future[?!void] {.async.} = - info "Starting..." - ?await d.ok.load() - ?await d.nok.load() - +method awake*(d: DhtMetrics): Future[?!void] {.async.} = proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = await d.handleCheckEvent(event) @@ -58,7 +54,12 @@ method start*(d: DhtMetrics): Future[?!void] {.async.} = d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck) d.subDel = d.state.events.nodesDeleted.subscribe(onDelete) + return success() +method start*(d: DhtMetrics): Future[?!void] {.async.} = + info "starting..." + ?await d.ok.load() + ?await d.nok.load() return success() method stop*(d: DhtMetrics): Future[?!void] {.async.} = diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index 2bd42bc..ef22ae0 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -203,7 +203,7 @@ method deleteEntries*( return success() method start*(s: NodeStore): Future[?!void] {.async.} = - info "Starting..." + info "starting..." proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = return await s.processFoundNodes(nids) diff --git a/codexcrawler/components/requeststore.nim b/codexcrawler/components/requeststore.nim new file mode 100644 index 0000000..836e1fb --- /dev/null +++ b/codexcrawler/components/requeststore.nim @@ -0,0 +1,119 @@ +import std/os +import pkg/datastore +import pkg/datastore/typedds +import pkg/questionable/results +import pkg/chronicles +import pkg/chronos +import pkg/libp2p + +import ../types +import ../component +import ../state +import ../utils/datastoreutils + +const requeststoreName = "requeststore" + +logScope: + topics = "requeststore" + +type + RequestEntry* = object + id*: Rid + isValid: bool + + OnRequestEntry* = + proc(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} + + RequestStore* = ref object of Component + state: State + store: TypedDatastore + +proc `$`*(entry: RequestEntry): string = + $entry.id + +proc toBytes*(entry: RequestEntry): seq[byte] = + var buffer = initProtoBuffer() + buffer.write(1, $entry.id) + buffer.finish() + return buffer.buffer + +proc fromBytes*(_: type RequestEntry, data: openArray[byte]): ?!RequestEntry = + var + buffer = initProtoBuffer(data) + idStr: string + + if buffer.getField(1, idStr).isErr: + return failure("Unable to decode `idStr`") + + return success(RequestEntry(id: Rid.fromStr(idStr), isValid: true)) + +proc encode*(e: RequestEntry): seq[byte] = + e.toBytes() + +proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T = + if bytes.len < 1: + return success(RequestEntry(isValid: false)) + return RequestEntry.fromBytes(bytes) + +method add*(s: RequestStore, rid: Rid): Future[?!void] {.async: (raises: []), base.} = + without key =? Key.init(requeststoreName / $rid), err: + error "failed to format key", err = err.msg + return failure(err) + + try: + ?await s.store.put(key, RequestEntry(id: rid)) + except CatchableError as exc: + return failure(exc.msg) + trace "Request entry added", id = $rid + return success() + +method remove*( + s: RequestStore, rid: Rid +): Future[?!void] {.async: (raises: []), base.} = + without key =? Key.init(requeststoreName / $rid), err: + error "failed to format key", err = err.msg + return failure(err) + + try: + ?await s.store.delete(key) + except CatchableError as exc: + return failure(exc.msg) + trace "Request entry removed", id = $rid + return success() + +method iterateAll*( + s: RequestStore, onNode: OnRequestEntry +): Future[?!void] {.async: (raises: []), base.} = + without queryKey =? Key.init(requeststoreName), err: + error "failed to format key", err = err.msg + return failure(err) + try: + without iter =? (await query[RequestEntry](s.store, Query.init(queryKey))), err: + error "failed to create query", err = err.msg + return failure(err) + + while not iter.finished: + without item =? (await iter.next()), err: + error "failure during query iteration", err = err.msg + return failure(err) + without value =? item.value, err: + error "failed to get value from iterator", err = err.msg + return failure(err) + + if value.isValid: + ?await onNode(value) + + await sleepAsync(1.millis) + except CatchableError as exc: + return failure(exc.msg) + return success() + +proc new*(T: type RequestStore, state: State, store: TypedDatastore): RequestStore = + RequestStore(state: state, store: store) + +proc createRequestStore*(state: State): ?!RequestStore = + without ds =? createTypedDatastore(state.config.dataDir / "requeststore"), err: + error "Failed to create typed datastore for request store", err = err.msg + return failure(err) + + return success(RequestStore.new(state, ds)) diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index b1af510..2683b0a 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -57,7 +57,7 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: [] return success() method start*(t: TimeTracker): Future[?!void] {.async.} = - info "Starting..." + info "starting..." proc onCheckRevisitAndExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} = await t.checkRevisitsAndExpiry() @@ -71,9 +71,6 @@ method start*(t: TimeTracker): Future[?!void] {.async.} = await t.state.whileRunning(onRoutingTable, 30.minutes) return success() -method stop*(t: TimeTracker): Future[?!void] {.async.} = - return success() - proc new*( T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht, clock: Clock ): TimeTracker = diff --git a/codexcrawler/components/todolist.nim b/codexcrawler/components/todolist.nim index e271f1e..33144ae 100644 --- a/codexcrawler/components/todolist.nim +++ b/codexcrawler/components/todolist.nim @@ -32,13 +32,13 @@ proc addNodes(t: TodoList, nids: seq[Nid]) = t.metrics.setTodoNodes(t.nids.len) if s =? t.emptySignal: - trace "Nodes added, resuming...", nodes = nids.len + trace "nodes added, resuming...", nodes = nids.len s.complete() t.emptySignal = Future[void].none method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} = if t.nids.len < 1: - trace "List is empty. Waiting for new items..." + trace "list is empty. Waiting for new items..." let signal = newFuture[void]("list.emptySignal") t.emptySignal = some(signal) try: @@ -54,8 +54,8 @@ method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} = return success(item) -method start*(t: TodoList): Future[?!void] {.async.} = - info "Starting TodoList..." +method awake*(t: TodoList): Future[?!void] {.async.} = + info "initializing..." proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} = t.addNodes(nids) diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index 76d853b..ef20ae2 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -10,7 +10,7 @@ let doc = Codex Network Crawler. Generates network metrics. Usage: - codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] [--stepDelay=] [--revisitDelay=] [--checkDelay=] [--expiryDelay=] + codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] [--dhtEnable=] [--stepDelay=] [--revisitDelay=] [--checkDelay=] [--expiryDelay=] [--marketplaceEnable=] [--ethProvider=] [--marketplaceAddress=] [--requestCheckDelay=] Options: --logLevel= Sets log level [default: INFO] @@ -20,10 +20,17 @@ Options: --dataDir=

Directory for storing data [default: crawler_data] --discoveryPort=

Port used for DHT [default: 8090] --bootNodes= Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs] + + --dhtEnable= Set to "1" to enable DHT crawler [default: 1] --stepDelay= Delay in milliseconds per node visit [default: 1000] --revisitDelay= Delay in minutes after which a node can be revisited [default: 60] --checkDelay= Delay with which the 'revisitDelay' is checked for all known nodes [default: 10] --expiryDelay= Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h) + + --marketplaceEnable= Set to "1" to enable marketplace metrics [default: 1] + --ethProvider= Address including http(s) or ws of the eth provider + --marketplaceAddress= Eth address of Codex contracts deployment + --requestCheckDelay= Delay in minutes after which storage contract status is (re)checked [default: 10] """ import strutils @@ -37,18 +44,28 @@ type Config* = ref object dataDir*: string discPort*: Port bootNodes*: seq[SignedPeerRecord] + + dhtEnable*: bool stepDelayMs*: int revisitDelayMins*: int checkDelayMins*: int expiryDelayMins*: int + marketplaceEnable*: bool + ethProvider*: string + marketplaceAddress*: string + requestCheckDelay*: int + proc `$`*(config: Config): string = "Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & - " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & - config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs & - " revisitDelayMins=" & $config.revisitDelayMins & " expiryDelayMins=" & - $config.expiryDelayMins & " checkDelayMins=" & $config.checkDelayMins + " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " dhtEnable=" & + $config.dhtEnable & " bootNodes=" & config.bootNodes.mapIt($it).join(";") & + " stepDelay=" & $config.stepDelayMs & " revisitDelayMins=" & $config.revisitDelayMins & + " expiryDelayMins=" & $config.expiryDelayMins & " checkDelayMins=" & + $config.checkDelayMins & " marketplaceEnable=" & $config.marketplaceEnable & + " ethProvider=" & config.ethProvider & " marketplaceAddress=" & + config.marketplaceAddress & " requestCheckDelay=" & $config.requestCheckDelay proc getDefaultTestnetBootNodes(): seq[string] = @[ @@ -86,6 +103,9 @@ proc stringToSpr(uri: string): SignedPeerRecord = proc getBootNodes(input: string): seq[SignedPeerRecord] = getBootNodeStrings(input).mapIt(stringToSpr(it)) +proc getEnable(input: string): bool = + input == "1" + proc parseConfig*(): Config = let args = docopt(doc, version = crawlerFullVersion) @@ -100,8 +120,13 @@ proc parseConfig*(): Config = dataDir: get("--dataDir"), discPort: Port(parseInt(get("--discoveryPort"))), bootNodes: getBootNodes(get("--bootNodes")), + dhtEnable: getEnable(get("--dhtEnable")), stepDelayMs: parseInt(get("--stepDelay")), revisitDelayMins: parseInt(get("--revisitDelay")), checkDelayMins: parseInt(get("--checkDelay")), expiryDelayMins: parseInt(get("--expiryDelay")), + marketplaceEnable: getEnable(get("--marketplaceEnable")), + ethProvider: get("--ethProvider"), + marketplaceAddress: get("--marketplaceAddress"), + requestCheckDelay: parseInt(get("--requestCheckDelay")), ) diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index 8b952aa..80fa924 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -5,16 +5,21 @@ import ./state import ./services/clock import ./services/metrics import ./services/dht + +import ./services/marketplace + import ./component -import ./components/crawler +import ./components/dhtcrawler import ./components/timetracker import ./components/nodestore import ./components/dhtmetrics import ./components/todolist +import ./components/chainmetrics +import ./components/chaincrawler +import ./components/requeststore proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = var components: seq[Component] = newSeq[Component]() - let clock = createClock() without dht =? (await createDht(state)), err: @@ -23,9 +28,14 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = without nodeStore =? createNodeStore(state, clock), err: return failure(err) + without requestStore =? createRequestStore(state), err: + return failure(err) + let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) todoList = createTodoList(state, metrics) + marketplace = createMarketplace(state, clock) + chainMetrics = ChainMetrics.new(state, metrics, requestStore, marketplace) without dhtMetrics =? createDhtMetrics(state, metrics), err: return failure(err) @@ -33,8 +43,11 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = components.add(dht) components.add(todoList) components.add(nodeStore) - components.add(Crawler.new(state, dht, todoList)) + components.add(DhtCrawler.new(state, dht, todoList)) components.add(TimeTracker.new(state, nodeStore, dht, clock)) components.add(dhtMetrics) + components.add(marketplace) + components.add(chainMetrics) + components.add(ChainCrawler.new(state, requestStore, marketplace)) return success(components) diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim new file mode 100644 index 0000000..12ebfa7 --- /dev/null +++ b/codexcrawler/services/marketplace.nim @@ -0,0 +1,116 @@ +import pkg/ethers +import pkg/questionable +import pkg/upraises +import ./marketplace/market +import ./marketplace/marketplace +import ../config +import ../component +import ../state +import ../types +import ./clock + +logScope: + topics = "marketplace" + +type + MarketplaceService* = ref object of Component + state: State + market: ?OnChainMarket + clock: Clock + + OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.} + RequestInfo* = ref object + pending*: bool + slots*: uint64 + slotSize*: uint64 + +proc notStarted() = + raiseAssert("MarketplaceService was called before it was started.") + +proc fetchRequestInfo( + market: OnChainMarket, rid: Rid +): Future[?RequestInfo] {.async: (raises: []).} = + try: + let request = await market.getRequest(rid) + if r =? request: + return + some(RequestInfo(pending: false, slots: r.ask.slots, slotSize: r.ask.slotSize)) + except CatchableError as exc: + trace "Failed to get request info", err = exc.msg + return none(RequestInfo) + +method subscribeToNewRequests*( + m: MarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []), base.} = + proc resultWrapper(rid: Rid): Future[void] {.async.} = + let response = await onNewRequest(rid) + if error =? response.errorOption: + raiseAssert("Error result in handling of onNewRequest callback: " & error.msg) + + proc onRequest( + id: RequestId, ask: StorageAsk, expiry: uint64 + ) {.gcsafe, upraises: [].} = + asyncSpawn resultWrapper(Rid(id)) + + if market =? m.market: + try: + discard await market.subscribeRequests(onRequest) + return success() + except CatchableError as exc: + return failure(exc.msg) + else: + notStarted() + +method iteratePastNewRequestEvents*( + m: MarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []), base.} = + let + oneDay = 60 * 60 * 24 + timespan = oneDay * 30 + startTime = m.clock.now() - timespan.uint64 + + if market =? m.market: + try: + let requests = await market.queryPastStorageRequestedEvents(startTime.int64) + for request in requests: + if error =? (await onNewRequest(Rid(request.requestId))).errorOption: + return failure(error.msg) + return success() + except CatchableError as exc: + return failure(exc.msg) + else: + notStarted() + +method getRequestInfo*( + m: MarketplaceService, rid: Rid +): Future[?RequestInfo] {.async: (raises: []), base.} = + # If the request id exists and is running, fetch the request object and return the info object. + # otherwise, return none. + if market =? m.market: + try: + let state = await market.requestState(rid) + if s =? state: + if s == RequestState.New: + return some(RequestInfo(pending: true)) + if s == RequestState.Started: + return await market.fetchRequestInfo(rid) + except CatchableError as exc: + trace "Failed to get request state", err = exc.msg + return none(RequestInfo) + else: + notStarted() + +method awake*(m: MarketplaceService): Future[?!void] {.async.} = + let provider = JsonRpcProvider.new(m.state.config.ethProvider) + without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): + return failure("Invalid MarketplaceAddress provided") + + let marketplace = Marketplace.new(marketplaceAddress, provider) + m.market = some(OnChainMarket.new(marketplace)) + return success() + +proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService = + return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock) + +proc createMarketplace*(state: State, clock: Clock): MarketplaceService = + return MarketplaceService.new(state, clock) diff --git a/codexcrawler/services/marketplace/config.nim b/codexcrawler/services/marketplace/config.nim new file mode 100644 index 0000000..3c31c8b --- /dev/null +++ b/codexcrawler/services/marketplace/config.nim @@ -0,0 +1,103 @@ +import pkg/contractabi +import pkg/ethers/fields +import pkg/questionable/results + +export contractabi + +const DefaultRequestCacheSize* = 128.uint16 + +type + MarketplaceConfig* = object + collateral*: CollateralConfig + proofs*: ProofConfig + reservations*: SlotReservationsConfig + requestDurationLimit*: uint64 + + CollateralConfig* = object + repairRewardPercentage*: uint8 + # percentage of remaining collateral slot has after it has been freed + maxNumberOfSlashes*: uint8 # frees slot when the number of slashes reaches this value + slashPercentage*: uint8 # percentage of the collateral that is slashed + validatorRewardPercentage*: uint8 + # percentage of the slashed amount going to the validators + + ProofConfig* = object + period*: uint64 # proofs requirements are calculated per period (in seconds) + timeout*: uint64 # mark proofs as missing before the timeout (in seconds) + downtime*: uint8 # ignore this much recent blocks for proof requirements + downtimeProduct*: uint8 + zkeyHash*: string # hash of the zkey file which is linked to the verifier + # Ensures the pointer does not remain in downtime for many consecutive + # periods. For each period increase, move the pointer `pointerProduct` + # blocks. Should be a prime number to ensure there are no cycles. + + SlotReservationsConfig* = object + maxReservations*: uint8 + +func fromTuple(_: type ProofConfig, tupl: tuple): ProofConfig = + ProofConfig( + period: tupl[0], + timeout: tupl[1], + downtime: tupl[2], + downtimeProduct: tupl[3], + zkeyHash: tupl[4], + ) + +func fromTuple(_: type SlotReservationsConfig, tupl: tuple): SlotReservationsConfig = + SlotReservationsConfig(maxReservations: tupl[0]) + +func fromTuple(_: type CollateralConfig, tupl: tuple): CollateralConfig = + CollateralConfig( + repairRewardPercentage: tupl[0], + maxNumberOfSlashes: tupl[1], + slashPercentage: tupl[2], + validatorRewardPercentage: tupl[3], + ) + +func fromTuple(_: type MarketplaceConfig, tupl: tuple): MarketplaceConfig = + MarketplaceConfig( + collateral: tupl[0], + proofs: tupl[1], + reservations: tupl[2], + requestDurationLimit: tupl[3], + ) + +func solidityType*(_: type SlotReservationsConfig): string = + solidityType(SlotReservationsConfig.fieldTypes) + +func solidityType*(_: type ProofConfig): string = + solidityType(ProofConfig.fieldTypes) + +func solidityType*(_: type CollateralConfig): string = + solidityType(CollateralConfig.fieldTypes) + +func solidityType*(_: type MarketplaceConfig): string = + solidityType(MarketplaceConfig.fieldTypes) + +func encode*(encoder: var AbiEncoder, slot: SlotReservationsConfig) = + encoder.write(slot.fieldValues) + +func encode*(encoder: var AbiEncoder, slot: ProofConfig) = + encoder.write(slot.fieldValues) + +func encode*(encoder: var AbiEncoder, slot: CollateralConfig) = + encoder.write(slot.fieldValues) + +func encode*(encoder: var AbiEncoder, slot: MarketplaceConfig) = + encoder.write(slot.fieldValues) + +func decode*(decoder: var AbiDecoder, T: type ProofConfig): ?!T = + let tupl = ?decoder.read(ProofConfig.fieldTypes) + success ProofConfig.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type SlotReservationsConfig): ?!T = + let tupl = ?decoder.read(SlotReservationsConfig.fieldTypes) + success SlotReservationsConfig.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type CollateralConfig): ?!T = + let tupl = ?decoder.read(CollateralConfig.fieldTypes) + success CollateralConfig.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type MarketplaceConfig): ?!T = + let tupl = ?decoder.read(MarketplaceConfig.fieldTypes) + success MarketplaceConfig.fromTuple(tupl) diff --git a/codexcrawler/services/marketplace/logutils.nim b/codexcrawler/services/marketplace/logutils.nim new file mode 100644 index 0000000..90415ea --- /dev/null +++ b/codexcrawler/services/marketplace/logutils.nim @@ -0,0 +1,286 @@ +## logutils is a module that has several goals: +## 1. Fix json logging output (run with `--log-format=json`) which was +## effectively broken for many types using default Chronicles json +## serialization. +## 2. Ability to specify log output for textlines and json sinks together or +## separately +## - This is useful if consuming json in some kind of log parser and need +## valid json with real values +## - eg a shortened Cid is nice to see in a text log in stdout, but won't +## provide a real Cid when parsed in json +## 4. Remove usages of `nim-json-serialization` from the codebase +## 5. Remove need to declare `writeValue` for new types +## 6. Remove need to [avoid importing or exporting `toJson`, `%`, `%*` to prevent +## conflicts](https://github.com/codex-storage/nim-codex/pull/645#issuecomment-1838834467) +## +## When declaring a new type, one should consider importing the `codex/logutils` +## module, and specifying `formatIt`. If textlines log output and json log output +## need to be different, overload `formatIt` and specify a `LogFormat`. If json +## serialization is needed, it can be declared with a `%` proc. `logutils` +## imports and exports `nim-serde` which handles the de/serialization, examples +## below. **Only `codex/logutils` needs to be imported.** +## +## Using `logutils` in the Codex codebase: +## - Instead of importing `pkg/chronicles`, import `pkg/codex/logutils` +## - most of `chronicles` is exported by `logutils` +## - Instead of importing `std/json`, import `pkg/serde/json` +## - `std/json` is exported by `serde` which is exported by `logutils` +## - Instead of importing `pkg/nim-json-serialization`, import +## `pkg/serde/json` or use codex-specific overloads by importing `utils/json` +## - one of the goals is to remove the use of `nim-json-serialization` +## +## ```nim +## import pkg/codex/logutils +## +## type +## BlockAddress* = object +## case leaf*: bool +## of true: +## treeCid* {.serialize.}: Cid +## index* {.serialize.}: Natural +## else: +## cid* {.serialize.}: Cid +## +## logutils.formatIt(LogFormat.textLines, BlockAddress): +## if it.leaf: +## "treeCid: " & shortLog($it.treeCid) & ", index: " & $it.index +## else: +## "cid: " & shortLog($it.cid) +## +## logutils.formatIt(LogFormat.json, BlockAddress): %it +## +## # chronicles textlines output +## TRC test tid=14397405 ba="treeCid: zb2*fndjU1, index: 0" +## # chronicles json output +## {"lvl":"TRC","msg":"test","tid":14397405,"ba":{"treeCid":"zb2rhgsDE16rLtbwTFeNKbdSobtKiWdjJPvKEuPgrQAfndjU1","index":0}} +## ``` +## In this case, `BlockAddress` is just an object, so `nim-serde` can handle +## serializing it without issue (only fields annotated with `{.serialize.}` will +## serialize (aka opt-in serialization)). +## +## If one so wished, another option for the textlines log output, would be to +## simply `toString` the serialised json: +## ```nim +## logutils.formatIt(LogFormat.textLines, BlockAddress): $ %it +## # or, more succinctly: +## logutils.formatIt(LogFormat.textLines, BlockAddress): it.toJson +## ``` +## In that case, both the textlines and json sinks would have the same output, +## so we could reduce this even further by not specifying a `LogFormat`: +## ```nim +## type +## BlockAddress* = object +## case leaf*: bool +## of true: +## treeCid* {.serialize.}: Cid +## index* {.serialize.}: Natural +## else: +## cid* {.serialize.}: Cid +## +## logutils.formatIt(BlockAddress): %it +## +## # chronicles textlines output +## TRC test tid=14400673 ba="{\"treeCid\":\"zb2rhgsDE16rLtbwTFeNKbdSobtKiWdjJPvKEuPgrQAfndjU1\",\"index\":0}" +## # chronicles json output +## {"lvl":"TRC","msg":"test","tid":14400673,"ba":{"treeCid":"zb2rhgsDE16rLtbwTFeNKbdSobtKiWdjJPvKEuPgrQAfndjU1","index":0}} +## ``` + +import std/options +import std/sequtils +import std/strutils +import std/sugar +import std/typetraits + +import pkg/chronicles except toJson, `%` +from pkg/libp2p import + Cid, PeerId, SignedPeerRecord, MultiAddress, AddressInfo, init, `$` +from pkg/ethers import Address +import pkg/questionable +import pkg/questionable/results +import pkg/stew/byteutils +import pkg/stint +import pkg/serde/json +import pkg/codexdht/discv5/node as dn +import pkg/contractabi + +export byteutils +export chronicles except toJson, formatIt, `%` +export questionable +export sequtils +export json except formatIt +export strutils +export sugar +export results + +proc fromJson*(_: type Cid, json: JsonNode): ?!Cid = + expectJsonKind(Cid, JString, json) + Cid.init(json.str).mapFailure + +func `%`*(cid: Cid): JsonNode = + % $cid + +func `%`*(obj: PeerId): JsonNode = + % $obj + +func `%`*(obj: SignedPeerRecord): JsonNode = + % $obj + +func `%`*(obj: dn.Address): JsonNode = + % $obj + +func `%`*(obj: AddressInfo): JsonNode = + % $obj.address + +func `%`*(obj: MultiAddress): JsonNode = + % $obj + +func `%`*(address: ethers.Address): JsonNode = + % $address + +func shortLog*(long: string, ellipses = "*", start = 3, stop = 6): string = + ## Returns compact string representation of ``long``. + var short = long + let minLen = start + ellipses.len + stop + if len(short) > minLen: + short.insert(ellipses, start) + + when (NimMajor, NimMinor) > (1, 4): + short.delete(start + ellipses.len .. short.high - stop) + else: + short.delete(start + ellipses.len, short.high - stop) + + short + +func shortHexLog*(long: string): string = + if long[0 .. 1] == "0x": + result &= "0x" + result &= long[2 .. long.high].shortLog("..", 4, 4) + +func short0xHexLog*[N: static[int], T: array[N, byte]](v: T): string = + v.to0xHex.shortHexLog + +func short0xHexLog*[T: distinct](v: T): string = + type BaseType = T.distinctBase + BaseType(v).short0xHexLog + +func short0xHexLog*[U: distinct, T: seq[U]](v: T): string = + type BaseType = U.distinctBase + "@[" & v.map(x => BaseType(x).short0xHexLog).join(",") & "]" + +func to0xHexLog*[T: distinct](v: T): string = + type BaseType = T.distinctBase + BaseType(v).to0xHex + +func to0xHexLog*[U: distinct, T: seq[U]](v: T): string = + type BaseType = U.distinctBase + "@[" & v.map(x => BaseType(x).to0xHex).join(",") & "]" + +proc formatTextLineSeq*(val: seq[string]): string = + "@[" & val.join(", ") & "]" + +template formatIt*(format: LogFormat, T: typedesc, body: untyped) = + # Provides formatters for logging with Chronicles for the given type and + # `LogFormat`. + # NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overridden + # since the base `setProperty` is generic using `auto` and conflicts with + # providing a generic `seq` and `Option` override. + when format == LogFormat.json: + proc formatJsonOption(val: ?T): JsonNode = + if it =? val: + json.`%`(body) + else: + newJNull() + + proc formatJsonResult*(val: ?!T): JsonNode = + without it =? val, error: + let jObj = newJObject() + jObj["error"] = newJString(error.msg) + return jObj + json.`%`(body) + + proc setProperty*(r: var JsonRecord, key: string, res: ?!T) = + var it {.inject, used.}: T + setProperty(r, key, res.formatJsonResult) + + proc setProperty*(r: var JsonRecord, key: string, opt: ?T) = + var it {.inject, used.}: T + let v = opt.formatJsonOption + setProperty(r, key, v) + + proc setProperty*(r: var JsonRecord, key: string, opts: seq[?T]) = + var it {.inject, used.}: T + let v = opts.map(opt => opt.formatJsonOption) + setProperty(r, key, json.`%`(v)) + + proc setProperty*( + r: var JsonRecord, key: string, val: seq[T] + ) {.raises: [ValueError, IOError].} = + var it {.inject, used.}: T + let v = val.map(it => body) + setProperty(r, key, json.`%`(v)) + + proc setProperty*( + r: var JsonRecord, key: string, val: T + ) {.raises: [ValueError, IOError].} = + var it {.inject, used.}: T = val + let v = body + setProperty(r, key, json.`%`(v)) + + elif format == LogFormat.textLines: + proc formatTextLineOption*(val: ?T): string = + var v = "none(" & $T & ")" + if it =? val: + v = "some(" & $(body) & ")" # that I used to know :) + v + + proc formatTextLineResult*(val: ?!T): string = + without it =? val, error: + return "Error: " & error.msg + $(body) + + proc setProperty*(r: var TextLineRecord, key: string, res: ?!T) = + var it {.inject, used.}: T + setProperty(r, key, res.formatTextLineResult) + + proc setProperty*(r: var TextLineRecord, key: string, opt: ?T) = + var it {.inject, used.}: T + let v = opt.formatTextLineOption + setProperty(r, key, v) + + proc setProperty*(r: var TextLineRecord, key: string, opts: seq[?T]) = + var it {.inject, used.}: T + let v = opts.map(opt => opt.formatTextLineOption) + setProperty(r, key, v.formatTextLineSeq) + + proc setProperty*( + r: var TextLineRecord, key: string, val: seq[T] + ) {.raises: [ValueError, IOError].} = + var it {.inject, used.}: T + let v = val.map(it => body) + setProperty(r, key, v.formatTextLineSeq) + + proc setProperty*( + r: var TextLineRecord, key: string, val: T + ) {.raises: [ValueError, IOError].} = + var it {.inject, used.}: T = val + let v = body + setProperty(r, key, v) + +template formatIt*(T: type, body: untyped) {.dirty.} = + formatIt(LogFormat.textLines, T): + body + formatIt(LogFormat.json, T): + body + +formatIt(LogFormat.textLines, Cid): + shortLog($it) +formatIt(LogFormat.json, Cid): + $it +formatIt(UInt256): + $it +formatIt(MultiAddress): + $it +formatIt(LogFormat.textLines, array[32, byte]): + it.short0xHexLog +formatIt(LogFormat.json, array[32, byte]): + it.to0xHex diff --git a/codexcrawler/services/marketplace/market.nim b/codexcrawler/services/marketplace/market.nim new file mode 100644 index 0000000..c0b88fa --- /dev/null +++ b/codexcrawler/services/marketplace/market.nim @@ -0,0 +1,605 @@ +import std/strutils +import std/strformat +import pkg/ethers +import pkg/upraises +import pkg/questionable +import ./logutils +import ./marketplace +import ./proofs +import ./provider +import ./config +import ./periods + +# Copy of nim-codex market.nim +# Edited to remove signing, reward address, etc + +logScope: + topics = "marketplace onchain market" + +type + OnChainMarket* = ref object of RootObj + contract: Marketplace + configuration: ?MarketplaceConfig + + Subscription = ref object of RootObj + MarketError* = object of CatchableError + MarketSubscription = market.Subscription + EventSubscription = ethers.Subscription + OnChainMarketSubscription = ref object of MarketSubscription + eventSubscription: EventSubscription + + ProofChallenge* = array[32, byte] + # Event callback signatures: + OnRequest* = + proc(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].} + OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].} + OnSlotFilled* = proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].} + OnSlotFreed* = proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].} + OnSlotReservationsFull* = + proc(requestId: RequestId, slotIndex: uint64) {.gcsafe, upraises: [].} + OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises: [].} + OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises: [].} + OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises: [].} + + # Marketplace events + MarketplaceEvent* = Event + StorageRequested* = object of MarketplaceEvent + requestId*: RequestId + ask*: StorageAsk + expiry*: uint64 + + SlotFilled* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + slotIndex*: uint64 + + SlotFreed* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + slotIndex*: uint64 + + SlotReservationsFull* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + slotIndex*: uint64 + + RequestFulfilled* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + + RequestCancelled* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + + RequestFailed* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + + ProofSubmitted* = object of MarketplaceEvent + id*: SlotId + +func new*(_: type OnChainMarket, contract: Marketplace): OnChainMarket = + OnChainMarket(contract: contract) + +proc raiseMarketError(message: string) {.raises: [MarketError].} = + raise newException(MarketError, message) + +proc msgDetail*(e: ref CatchableError): string = + var msg = e.msg + if e.parent != nil: + msg = fmt"{msg} Inner exception: {e.parent.msg}" + return msg + +template convertEthersError(body) = + try: + body + except EthersError as error: + raiseMarketError(error.msgDetail) + +proc loadConfig( + market: OnChainMarket +): Future[?!void] {.async: (raises: [CancelledError, MarketError]).} = + try: + without config =? market.configuration: + let fetchedConfig = await market.contract.configuration() + + market.configuration = some fetchedConfig + + return success() + except AsyncLockError, EthersError, CatchableError: + let err = getCurrentException() + return failure newException( + MarketError, + "Failed to fetch the config from the Marketplace contract: " & err.msg, + ) + +proc config( + market: OnChainMarket +): Future[MarketplaceConfig] {.async: (raises: [CancelledError, MarketError]).} = + without resolvedConfig =? market.configuration: + if err =? (await market.loadConfig()).errorOption: + raiseMarketError(err.msg) + + without config =? market.configuration: + raiseMarketError("Failed to access to config from the Marketplace contract") + + return config + + return resolvedConfig + +proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = + raiseAssert("Not available: approveFunds") + +proc getZkeyHash*( + market: OnChainMarket +): Future[?string] {.async: (raises: [CancelledError, MarketError]).} = + let config = await market.config() + return some config.proofs.zkeyHash + +proc periodicity*( + market: OnChainMarket +): Future[Periodicity] {.async: (raises: [CancelledError, MarketError]).} = + convertEthersError: + let config = await market.config() + let period = config.proofs.period + return Periodicity(seconds: period) + +proc proofTimeout*( + market: OnChainMarket +): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} = + convertEthersError: + let config = await market.config() + return config.proofs.timeout + +proc repairRewardPercentage*( + market: OnChainMarket +): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = + convertEthersError: + let config = await market.config() + return config.collateral.repairRewardPercentage + +proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} = + convertEthersError: + let config = await market.config() + return config.requestDurationLimit + +proc proofDowntime*( + market: OnChainMarket +): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = + convertEthersError: + let config = await market.config() + return config.proofs.downtime + +proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} = + convertEthersError: + let overrides = CallOverrides(blockTag: some BlockTag.pending) + return await market.contract.getPointer(slotId, overrides) + +proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} = + convertEthersError: + return await market.contract.myRequests + +proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} = + convertEthersError: + let slots = await market.contract.mySlots() + debug "Fetched my slots", numSlots = len(slots) + + return slots + +proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} = + convertEthersError: + debug "Requesting storage" + await market.approveFunds(request.totalPrice()) + discard await market.contract.requestStorage(request).confirm(1) + +proc getRequest*( + market: OnChainMarket, id: RequestId +): Future[?StorageRequest] {.async: (raises: [CancelledError]).} = + try: + let key = $id + + # if key in market.requestCache: + # return some market.requestCache[key] + + let request = await market.contract.getRequest(id) + # market.requestCache[key] = request + return some request + except Marketplace_UnknownRequest, KeyError: + warn "Cannot retrieve the request", error = getCurrentExceptionMsg() + return none StorageRequest + except EthersError, AsyncLockError: + error "Cannot retrieve the request", error = getCurrentExceptionMsg() + return none StorageRequest + except CatchableError as err: + error "Unknown error", error = err.msg + return none StorageRequest + +proc requestState*( + market: OnChainMarket, requestId: RequestId +): Future[?RequestState] {.async.} = + convertEthersError: + try: + let overrides = CallOverrides(blockTag: some BlockTag.pending) + return some await market.contract.requestState(requestId, overrides) + except Marketplace_UnknownRequest: + return none RequestState + +proc slotState*( + market: OnChainMarket, slotId: SlotId +): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} = + convertEthersError: + try: + let overrides = CallOverrides(blockTag: some BlockTag.pending) + return await market.contract.slotState(slotId, overrides) + except AsyncLockError as err: + raiseMarketError( + "Failed to fetch the slot state from the Marketplace contract: " & err.msg + ) + except CatchableError as err: + raiseMarketError("Unknown error: " & err.msg) + +proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[int64] {.async.} = + convertEthersError: + return await market.contract.requestEnd(id) + +proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[int64] {.async.} = + convertEthersError: + return await market.contract.requestExpiry(id) + +proc getHost( + market: OnChainMarket, requestId: RequestId, slotIndex: uint64 +): Future[?Address] {.async.} = + convertEthersError: + let slotId = slotId(requestId, slotIndex) + let address = await market.contract.getHost(slotId) + if address != Address.default: + return some address + else: + return none Address + +proc currentCollateral*( + market: OnChainMarket, slotId: SlotId +): Future[UInt256] {.async.} = + convertEthersError: + return await market.contract.currentCollateral(slotId) + +proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async.} = + convertEthersError: + try: + return some await market.contract.getActiveSlot(slotId) + except Marketplace_SlotIsFree: + return none Slot + +proc fillSlot( + market: OnChainMarket, + requestId: RequestId, + slotIndex: uint64, + proof: Groth16Proof, + collateral: UInt256, +) {.async.} = + convertEthersError: + logScope: + requestId + slotIndex + + await market.approveFunds(collateral) + trace "calling fillSlot on contract" + discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(1) + trace "fillSlot transaction completed" + +proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} = + raiseAssert("Not available: freeSlot") + +proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} = + convertEthersError: + discard await market.contract.withdrawFunds(requestId).confirm(1) + +proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} = + convertEthersError: + try: + let overrides = CallOverrides(blockTag: some BlockTag.pending) + return await market.contract.isProofRequired(id, overrides) + except Marketplace_SlotIsFree: + return false + +proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} = + convertEthersError: + try: + let overrides = CallOverrides(blockTag: some BlockTag.pending) + return await market.contract.willProofBeRequired(id, overrides) + except Marketplace_SlotIsFree: + return false + +proc getChallenge*( + market: OnChainMarket, id: SlotId +): Future[ProofChallenge] {.async.} = + convertEthersError: + let overrides = CallOverrides(blockTag: some BlockTag.pending) + return await market.contract.getChallenge(id, overrides) + +proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} = + convertEthersError: + discard await market.contract.submitProof(id, proof).confirm(1) + +proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} = + convertEthersError: + discard await market.contract.markProofAsMissing(id, period).confirm(1) + +proc canProofBeMarkedAsMissing*( + market: OnChainMarket, id: SlotId, period: Period +): Future[bool] {.async.} = + let provider = market.contract.provider + let contractWithoutSigner = market.contract.connect(provider) + let overrides = CallOverrides(blockTag: some BlockTag.pending) + try: + discard await contractWithoutSigner.markProofAsMissing(id, period, overrides) + return true + except EthersError as e: + trace "Proof cannot be marked as missing", msg = e.msg + return false + +proc reserveSlot*( + market: OnChainMarket, requestId: RequestId, slotIndex: uint64 +) {.async.} = + convertEthersError: + discard await market.contract + .reserveSlot( + requestId, + slotIndex, + # reserveSlot runs out of gas for unknown reason, but 100k gas covers it + TransactionOverrides(gasLimit: some 100000.u256), + ) + .confirm(1) + +proc canReserveSlot*( + market: OnChainMarket, requestId: RequestId, slotIndex: uint64 +): Future[bool] {.async.} = + convertEthersError: + return await market.contract.canReserveSlot(requestId, slotIndex) + +proc subscribeRequests*( + market: OnChainMarket, callback: OnRequest +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in Request subscription", msg = eventErr.msg + return + + callback(event.requestId, event.ask, event.expiry) + + convertEthersError: + let subscription = await market.contract.subscribe(StorageRequested, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeSlotFilled*( + market: OnChainMarket, callback: OnSlotFilled +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in SlotFilled subscription", msg = eventErr.msg + return + + callback(event.requestId, event.slotIndex) + + convertEthersError: + let subscription = await market.contract.subscribe(SlotFilled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeSlotFilled*( + market: OnChainMarket, + requestId: RequestId, + slotIndex: uint64, + callback: OnSlotFilled, +): Future[MarketSubscription] {.async.} = + proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) = + if eventRequestId == requestId and eventSlotIndex == slotIndex: + callback(requestId, slotIndex) + + convertEthersError: + return await market.subscribeSlotFilled(onSlotFilled) + +proc subscribeSlotFreed*( + market: OnChainMarket, callback: OnSlotFreed +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in SlotFreed subscription", msg = eventErr.msg + return + + callback(event.requestId, event.slotIndex) + + convertEthersError: + let subscription = await market.contract.subscribe(SlotFreed, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeSlotReservationsFull*( + market: OnChainMarket, callback: OnSlotReservationsFull +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in SlotReservationsFull subscription", + msg = eventErr.msg + return + + callback(event.requestId, event.slotIndex) + + convertEthersError: + let subscription = await market.contract.subscribe(SlotReservationsFull, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeFulfillment( + market: OnChainMarket, callback: OnFulfillment +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in RequestFulfillment subscription", msg = eventErr.msg + return + + callback(event.requestId) + + convertEthersError: + let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeFulfillment( + market: OnChainMarket, requestId: RequestId, callback: OnFulfillment +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in RequestFulfillment subscription", msg = eventErr.msg + return + + if event.requestId == requestId: + callback(event.requestId) + + convertEthersError: + let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeRequestCancelled*( + market: OnChainMarket, callback: OnRequestCancelled +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in RequestCancelled subscription", msg = eventErr.msg + return + + callback(event.requestId) + + convertEthersError: + let subscription = await market.contract.subscribe(RequestCancelled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeRequestCancelled*( + market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in RequestCancelled subscription", msg = eventErr.msg + return + + if event.requestId == requestId: + callback(event.requestId) + + convertEthersError: + let subscription = await market.contract.subscribe(RequestCancelled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeRequestFailed*( + market: OnChainMarket, callback: OnRequestFailed +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in RequestFailed subscription", msg = eventErr.msg + return + + callback(event.requestId) + + convertEthersError: + let subscription = await market.contract.subscribe(RequestFailed, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeRequestFailed*( + market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in RequestFailed subscription", msg = eventErr.msg + return + + if event.requestId == requestId: + callback(event.requestId) + + convertEthersError: + let subscription = await market.contract.subscribe(RequestFailed, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc subscribeProofSubmission*( + market: OnChainMarket, callback: OnProofSubmitted +): Future[MarketSubscription] {.async.} = + proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} = + without event =? eventResult, eventErr: + error "There was an error in ProofSubmitted subscription", msg = eventErr.msg + return + + callback(event.id) + + convertEthersError: + let subscription = await market.contract.subscribe(ProofSubmitted, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + +proc unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = + await subscription.eventSubscription.unsubscribe() + +proc queryPastSlotFilledEvents*( + market: OnChainMarket, fromBlock: BlockTag +): Future[seq[SlotFilled]] {.async.} = + convertEthersError: + return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest) + +proc queryPastSlotFilledEvents*( + market: OnChainMarket, blocksAgo: int +): Future[seq[SlotFilled]] {.async.} = + convertEthersError: + let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) + + return await market.queryPastSlotFilledEvents(fromBlock) + +proc queryPastSlotFilledEvents*( + market: OnChainMarket, fromTime: int64 +): Future[seq[SlotFilled]] {.async.} = + convertEthersError: + let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) + return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock)) + +proc queryPastStorageRequestedEvents*( + market: OnChainMarket, fromBlock: BlockTag +): Future[seq[StorageRequested]] {.async.} = + convertEthersError: + return + await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest) + +proc queryPastStorageRequestedEvents*( + market: OnChainMarket, blocksAgo: int +): Future[seq[StorageRequested]] {.async.} = + convertEthersError: + let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) + return await market.queryPastStorageRequestedEvents(fromBlock) + +proc queryPastStorageRequestedEvents*( + market: OnChainMarket, fromTime: int64 +): Future[seq[StorageRequested]] {.async.} = + convertEthersError: + let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) + return await market.queryPastStorageRequestedEvents(BlockTag.init(fromBlock)) + +proc slotCollateral*( + market: OnChainMarket, collateralPerSlot: UInt256, slotState: SlotState +): ?!UInt256 {.raises: [].} = + if slotState == SlotState.Repair: + without repairRewardPercentage =? + market.configuration .? collateral .? repairRewardPercentage: + return failure newException( + MarketError, + "Failure calculating the slotCollateral, cannot get the reward percentage", + ) + + return success ( + collateralPerSlot - (collateralPerSlot * repairRewardPercentage.u256).div( + 100.u256 + ) + ) + + return success(collateralPerSlot) + +proc slotCollateral*( + market: OnChainMarket, requestId: RequestId, slotIndex: uint64 +): Future[?!UInt256] {.async: (raises: [CancelledError]).} = + let slotid = slotId(requestId, slotIndex) + + try: + let slotState = await market.slotState(slotid) + + without request =? await market.getRequest(requestId): + return failure newException( + MarketError, "Failure calculating the slotCollateral, cannot get the request" + ) + + return market.slotCollateral(request.ask.collateralPerSlot, slotState) + except MarketError as error: + error "Error when trying to calculate the slotCollateral", error = error.msg + return failure error diff --git a/codexcrawler/services/marketplace/marketplace.nim b/codexcrawler/services/marketplace/marketplace.nim new file mode 100644 index 0000000..1f0f90d --- /dev/null +++ b/codexcrawler/services/marketplace/marketplace.nim @@ -0,0 +1,184 @@ +import pkg/ethers +import pkg/ethers/erc20 +import pkg/json_rpc/rpcclient +import pkg/stint +import pkg/chronos +import ./requests +import ./proofs +import ./config + +export stint +export ethers except `%`, `%*`, toJson +export erc20 except `%`, `%*`, toJson +export config +export requests + +type + Marketplace* = ref object of Contract + + Marketplace_RepairRewardPercentageTooHigh* = object of SolidityError + Marketplace_SlashPercentageTooHigh* = object of SolidityError + Marketplace_MaximumSlashingTooHigh* = object of SolidityError + Marketplace_InvalidExpiry* = object of SolidityError + Marketplace_InvalidMaxSlotLoss* = object of SolidityError + Marketplace_InsufficientSlots* = object of SolidityError + Marketplace_InvalidClientAddress* = object of SolidityError + Marketplace_RequestAlreadyExists* = object of SolidityError + Marketplace_InvalidSlot* = object of SolidityError + Marketplace_SlotNotFree* = object of SolidityError + Marketplace_InvalidSlotHost* = object of SolidityError + Marketplace_AlreadyPaid* = object of SolidityError + Marketplace_TransferFailed* = object of SolidityError + Marketplace_UnknownRequest* = object of SolidityError + Marketplace_InvalidState* = object of SolidityError + Marketplace_StartNotBeforeExpiry* = object of SolidityError + Marketplace_SlotNotAcceptingProofs* = object of SolidityError + Marketplace_SlotIsFree* = object of SolidityError + Marketplace_ReservationRequired* = object of SolidityError + Marketplace_NothingToWithdraw* = object of SolidityError + Marketplace_InsufficientDuration* = object of SolidityError + Marketplace_InsufficientProofProbability* = object of SolidityError + Marketplace_InsufficientCollateral* = object of SolidityError + Marketplace_InsufficientReward* = object of SolidityError + Marketplace_InvalidCid* = object of SolidityError + Marketplace_DurationExceedsLimit* = object of SolidityError + Proofs_InsufficientBlockHeight* = object of SolidityError + Proofs_InvalidProof* = object of SolidityError + Proofs_ProofAlreadySubmitted* = object of SolidityError + Proofs_PeriodNotEnded* = object of SolidityError + Proofs_ValidationTimedOut* = object of SolidityError + Proofs_ProofNotMissing* = object of SolidityError + Proofs_ProofNotRequired* = object of SolidityError + Proofs_ProofAlreadyMarkedMissing* = object of SolidityError + Proofs_InvalidProbability* = object of SolidityError + Periods_InvalidSecondsPerPeriod* = object of SolidityError + +proc configuration*(marketplace: Marketplace): MarketplaceConfig {.contract, view.} +proc token*(marketplace: Marketplace): Address {.contract, view.} +proc currentCollateral*( + marketplace: Marketplace, id: SlotId +): UInt256 {.contract, view.} + +proc requestStorage*( + marketplace: Marketplace, request: StorageRequest +): Confirmable {. + contract, + errors: [ + Marketplace_InvalidClientAddress, Marketplace_RequestAlreadyExists, + Marketplace_InvalidExpiry, Marketplace_InsufficientSlots, + Marketplace_InvalidMaxSlotLoss, + ] +.} + +proc fillSlot*( + marketplace: Marketplace, requestId: RequestId, slotIndex: uint64, proof: Groth16Proof +): Confirmable {. + contract, + errors: [ + Marketplace_InvalidSlot, Marketplace_ReservationRequired, Marketplace_SlotNotFree, + Marketplace_StartNotBeforeExpiry, Marketplace_UnknownRequest, + ] +.} + +proc withdrawFunds*( + marketplace: Marketplace, requestId: RequestId +): Confirmable {. + contract, + errors: [ + Marketplace_InvalidClientAddress, Marketplace_InvalidState, + Marketplace_NothingToWithdraw, Marketplace_UnknownRequest, + ] +.} + +proc withdrawFunds*( + marketplace: Marketplace, requestId: RequestId, withdrawAddress: Address +): Confirmable {. + contract, + errors: [ + Marketplace_InvalidClientAddress, Marketplace_InvalidState, + Marketplace_NothingToWithdraw, Marketplace_UnknownRequest, + ] +.} + +proc freeSlot*( + marketplace: Marketplace, id: SlotId +): Confirmable {. + contract, + errors: [ + Marketplace_InvalidSlotHost, Marketplace_AlreadyPaid, + Marketplace_StartNotBeforeExpiry, Marketplace_UnknownRequest, Marketplace_SlotIsFree, + ] +.} + +proc freeSlot*( + marketplace: Marketplace, + id: SlotId, + rewardRecipient: Address, + collateralRecipient: Address, +): Confirmable {. + contract, + errors: [ + Marketplace_InvalidSlotHost, Marketplace_AlreadyPaid, + Marketplace_StartNotBeforeExpiry, Marketplace_UnknownRequest, Marketplace_SlotIsFree, + ] +.} + +proc getRequest*( + marketplace: Marketplace, id: RequestId +): StorageRequest {.contract, view, errors: [Marketplace_UnknownRequest].} + +proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.} +proc getActiveSlot*( + marketplace: Marketplace, id: SlotId +): Slot {.contract, view, errors: [Marketplace_SlotIsFree].} + +proc myRequests*(marketplace: Marketplace): seq[RequestId] {.contract, view.} +proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.} +proc requestState*( + marketplace: Marketplace, requestId: RequestId +): RequestState {.contract, view, errors: [Marketplace_UnknownRequest].} + +proc slotState*(marketplace: Marketplace, slotId: SlotId): SlotState {.contract, view.} +proc requestEnd*( + marketplace: Marketplace, requestId: RequestId +): int64 {.contract, view.} + +proc requestExpiry*( + marketplace: Marketplace, requestId: RequestId +): int64 {.contract, view.} + +proc missingProofs*(marketplace: Marketplace, id: SlotId): UInt256 {.contract, view.} +proc isProofRequired*(marketplace: Marketplace, id: SlotId): bool {.contract, view.} +proc willProofBeRequired*(marketplace: Marketplace, id: SlotId): bool {.contract, view.} +proc getChallenge*( + marketplace: Marketplace, id: SlotId +): array[32, byte] {.contract, view.} + +proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.} + +proc submitProof*( + marketplace: Marketplace, id: SlotId, proof: Groth16Proof +): Confirmable {. + contract, + errors: + [Proofs_ProofAlreadySubmitted, Proofs_InvalidProof, Marketplace_UnknownRequest] +.} + +proc markProofAsMissing*( + marketplace: Marketplace, id: SlotId, period: uint64 +): Confirmable {. + contract, + errors: [ + Marketplace_SlotNotAcceptingProofs, Marketplace_StartNotBeforeExpiry, + Proofs_PeriodNotEnded, Proofs_ValidationTimedOut, Proofs_ProofNotMissing, + Proofs_ProofNotRequired, Proofs_ProofAlreadyMarkedMissing, + ] +.} + +proc reserveSlot*( + marketplace: Marketplace, requestId: RequestId, slotIndex: uint64 +): Confirmable {.contract.} + +proc canReserveSlot*( + marketplace: Marketplace, requestId: RequestId, slotIndex: uint64 +): bool {.contract, view.} diff --git a/codexcrawler/services/marketplace/periods.nim b/codexcrawler/services/marketplace/periods.nim new file mode 100644 index 0000000..cbb860e --- /dev/null +++ b/codexcrawler/services/marketplace/periods.nim @@ -0,0 +1,17 @@ +import pkg/stint + +type + Periodicity* = object + seconds*: uint64 + + Period* = uint64 + Timestamp* = uint64 + +func periodOf*(periodicity: Periodicity, timestamp: Timestamp): Period = + timestamp div periodicity.seconds + +func periodStart*(periodicity: Periodicity, period: Period): Timestamp = + period * periodicity.seconds + +func periodEnd*(periodicity: Periodicity, period: Period): Timestamp = + periodicity.periodStart(period + 1) diff --git a/codexcrawler/services/marketplace/proofs.nim b/codexcrawler/services/marketplace/proofs.nim new file mode 100644 index 0000000..771d685 --- /dev/null +++ b/codexcrawler/services/marketplace/proofs.nim @@ -0,0 +1,46 @@ +import pkg/stint +import pkg/contractabi +import pkg/ethers/fields + +type + Groth16Proof* = object + a*: G1Point + b*: G2Point + c*: G1Point + + G1Point* = object + x*: UInt256 + y*: UInt256 + + # A field element F_{p^2} encoded as `real + i * imag` + Fp2Element* = object + real*: UInt256 + imag*: UInt256 + + G2Point* = object + x*: Fp2Element + y*: Fp2Element + +func solidityType*(_: type G1Point): string = + solidityType(G1Point.fieldTypes) + +func solidityType*(_: type Fp2Element): string = + solidityType(Fp2Element.fieldTypes) + +func solidityType*(_: type G2Point): string = + solidityType(G2Point.fieldTypes) + +func solidityType*(_: type Groth16Proof): string = + solidityType(Groth16Proof.fieldTypes) + +func encode*(encoder: var AbiEncoder, point: G1Point) = + encoder.write(point.fieldValues) + +func encode*(encoder: var AbiEncoder, element: Fp2Element) = + encoder.write(element.fieldValues) + +func encode*(encoder: var AbiEncoder, point: G2Point) = + encoder.write(point.fieldValues) + +func encode*(encoder: var AbiEncoder, proof: Groth16Proof) = + encoder.write(proof.fieldValues) diff --git a/codexcrawler/services/marketplace/provider.nim b/codexcrawler/services/marketplace/provider.nim new file mode 100644 index 0000000..ac5a6c1 --- /dev/null +++ b/codexcrawler/services/marketplace/provider.nim @@ -0,0 +1,120 @@ +import pkg/ethers/provider +import pkg/chronos +import pkg/questionable +import ./logutils + +logScope: + topics = "marketplace onchain provider" + +proc raiseProviderError(message: string) {.raises: [ProviderError].} = + raise newException(ProviderError, message) + +proc blockNumberAndTimestamp*( + provider: Provider, blockTag: BlockTag +): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError, CancelledError]).} = + without latestBlock =? await provider.getBlock(blockTag): + raiseProviderError("Could not get latest block") + + without latestBlockNumber =? latestBlock.number: + raiseProviderError("Could not get latest block number") + + return (latestBlockNumber, latestBlock.timestamp) + +proc binarySearchFindClosestBlock( + provider: Provider, epochTime: int, low: UInt256, high: UInt256 +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = + let (_, lowTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(low)) + let (_, highTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(high)) + if abs(lowTimestamp.truncate(int) - epochTime) < + abs(highTimestamp.truncate(int) - epochTime): + return low + else: + return high + +proc binarySearchBlockNumberForEpoch( + provider: Provider, + epochTime: UInt256, + latestBlockNumber: UInt256, + earliestBlockNumber: UInt256, +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = + var low = earliestBlockNumber + var high = latestBlockNumber + + while low <= high: + if low == 0 and high == 0: + return low + let mid = (low + high) div 2 + let (midBlockNumber, midBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.init(mid)) + + if midBlockTimestamp < epochTime: + low = mid + 1 + elif midBlockTimestamp > epochTime: + high = mid - 1 + else: + return midBlockNumber + # NOTICE that by how the binary search is implemented, when it finishes + # low is always greater than high - this is why we use high, where + # intuitively we would use low: + await provider.binarySearchFindClosestBlock( + epochTime.truncate(int), low = high, high = low + ) + +proc blockNumberForEpoch*( + provider: Provider, epochTime: int64 +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = + let epochTimeUInt256 = epochTime.u256 + let (latestBlockNumber, latestBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.latest) + let (earliestBlockNumber, earliestBlockTimestamp) = + await provider.blockNumberAndTimestamp(BlockTag.earliest) + + # Initially we used the average block time to predict + # the number of blocks we need to look back in order to find + # the block number corresponding to the given epoch time. + # This estimation can be highly inaccurate if block time + # was changing in the past or is fluctuating and therefore + # we used that information initially only to find out + # if the available history is long enough to perform effective search. + # It turns out we do not have to do that. There is an easier way. + # + # First we check if the given epoch time equals the timestamp of either + # the earliest or the latest block. If it does, we just return the + # block number of that block. + # + # Otherwise, if the earliest available block is not the genesis block, + # we should check the timestamp of that earliest block and if it is greater + # than the epoch time, we should issue a warning and return + # that earliest block number. + # In all other cases, thus when the earliest block is not the genesis + # block but its timestamp is not greater than the requested epoch time, or + # if the earliest available block is the genesis block, + # (which means we have the whole history available), we should proceed with + # the binary search. + # + # Additional benefit of this method is that we do not have to rely + # on the average block time, which not only makes the whole thing + # more reliable, but also easier to test. + + # Are lucky today? + if earliestBlockTimestamp == epochTimeUInt256: + return earliestBlockNumber + if latestBlockTimestamp == epochTimeUInt256: + return latestBlockNumber + + if earliestBlockNumber > 0 and earliestBlockTimestamp > epochTimeUInt256: + let availableHistoryInDays = + (latestBlockTimestamp - earliestBlockTimestamp) div 1.days.secs.u256 + warn "Short block history detected.", + earliestBlockTimestamp = earliestBlockTimestamp, days = availableHistoryInDays + return earliestBlockNumber + + return await provider.binarySearchBlockNumberForEpoch( + epochTimeUInt256, latestBlockNumber, earliestBlockNumber + ) + +proc pastBlockTag*( + provider: Provider, blocksAgo: int +): Future[BlockTag] {.async: (raises: [ProviderError, CancelledError]).} = + let head = await provider.getBlockNumber() + return BlockTag.init(head - blocksAgo.abs.u256) diff --git a/codexcrawler/services/marketplace/readme.md b/codexcrawler/services/marketplace/readme.md new file mode 100644 index 0000000..fd2b28a --- /dev/null +++ b/codexcrawler/services/marketplace/readme.md @@ -0,0 +1,2 @@ +These are copied from nim-codex v0.2.0. +There are plans to extract/refactor the contract interoperability code from nim-codex into its own submodule. But this isn't prioritized atm. So we're copying it here until that's been handled. diff --git a/codexcrawler/services/marketplace/requests.nim b/codexcrawler/services/marketplace/requests.nim new file mode 100644 index 0000000..a6e4281 --- /dev/null +++ b/codexcrawler/services/marketplace/requests.nim @@ -0,0 +1,220 @@ +import std/hashes +import std/sequtils +import std/typetraits +import pkg/contractabi +import pkg/nimcrypto +import pkg/ethers/fields +import pkg/results +import pkg/questionable/results +import pkg/stew/byteutils +import pkg/libp2p/[cid, multicodec] +import pkg/serde/json +import ./logutils + +export contractabi + +type + StorageRequest* = object + client* {.serialize.}: Address + ask* {.serialize.}: StorageAsk + content* {.serialize.}: StorageContent + expiry* {.serialize.}: uint64 + nonce*: Nonce + + StorageAsk* = object + proofProbability* {.serialize.}: UInt256 + pricePerBytePerSecond* {.serialize.}: UInt256 + collateralPerByte* {.serialize.}: UInt256 + slots* {.serialize.}: uint64 + slotSize* {.serialize.}: uint64 + duration* {.serialize.}: uint64 + maxSlotLoss* {.serialize.}: uint64 + + StorageContent* = object + cid* {.serialize.}: Cid + merkleRoot*: array[32, byte] + + Slot* = object + request* {.serialize.}: StorageRequest + slotIndex* {.serialize.}: uint64 + + SlotId* = distinct array[32, byte] + RequestId* = distinct array[32, byte] + Nonce* = distinct array[32, byte] + RequestState* {.pure.} = enum + New + Started + Cancelled + Finished + Failed + + SlotState* {.pure.} = enum + Free + Filled + Finished + Failed + Paid + Cancelled + Repair + +template mapFailure*[T, V, E]( + exp: Result[T, V], exc: typedesc[E] +): Result[T, ref CatchableError] = + ## Convert `Result[T, E]` to `Result[E, ref CatchableError]` + ## + + exp.mapErr( + proc(e: V): ref CatchableError = + (ref exc)(msg: $e) + ) + +template mapFailure*[T, V](exp: Result[T, V]): Result[T, ref CatchableError] = + mapFailure(exp, CatchableError) + +proc `==`*(x, y: Nonce): bool {.borrow.} +proc `==`*(x, y: RequestId): bool {.borrow.} +proc `==`*(x, y: SlotId): bool {.borrow.} +proc hash*(x: SlotId): Hash {.borrow.} +proc hash*(x: Nonce): Hash {.borrow.} +proc hash*(x: Address): Hash {.borrow.} + +func toArray*(id: RequestId | SlotId | Nonce): array[32, byte] = + array[32, byte](id) + +proc `$`*(id: RequestId | SlotId | Nonce): string = + id.toArray.toHex + +proc fromHex*(T: type RequestId, hex: string): T = + T array[32, byte].fromHex(hex) + +proc fromHex*(T: type SlotId, hex: string): T = + T array[32, byte].fromHex(hex) + +proc fromHex*(T: type Nonce, hex: string): T = + T array[32, byte].fromHex(hex) + +proc fromHex*[T: distinct](_: type T, hex: string): T = + type baseType = T.distinctBase + T baseType.fromHex(hex) + +proc toHex*[T: distinct](id: T): string = + type baseType = T.distinctBase + baseType(id).toHex + +logutils.formatIt(LogFormat.textLines, Nonce): + it.short0xHexLog +logutils.formatIt(LogFormat.textLines, RequestId): + it.short0xHexLog +logutils.formatIt(LogFormat.textLines, SlotId): + it.short0xHexLog +logutils.formatIt(LogFormat.json, Nonce): + it.to0xHexLog +logutils.formatIt(LogFormat.json, RequestId): + it.to0xHexLog +logutils.formatIt(LogFormat.json, SlotId): + it.to0xHexLog + +func fromTuple(_: type StorageRequest, tupl: tuple): StorageRequest = + StorageRequest( + client: tupl[0], ask: tupl[1], content: tupl[2], expiry: tupl[3], nonce: tupl[4] + ) + +func fromTuple(_: type Slot, tupl: tuple): Slot = + Slot(request: tupl[0], slotIndex: tupl[1]) + +func fromTuple(_: type StorageAsk, tupl: tuple): StorageAsk = + StorageAsk( + proofProbability: tupl[0], + pricePerBytePerSecond: tupl[1], + collateralPerByte: tupl[2], + slots: tupl[3], + slotSize: tupl[4], + duration: tupl[5], + maxSlotLoss: tupl[6], + ) + +func fromTuple(_: type StorageContent, tupl: tuple): StorageContent = + StorageContent(cid: tupl[0], merkleRoot: tupl[1]) + +func solidityType*(_: type Cid): string = + solidityType(seq[byte]) + +func solidityType*(_: type StorageContent): string = + solidityType(StorageContent.fieldTypes) + +func solidityType*(_: type StorageAsk): string = + solidityType(StorageAsk.fieldTypes) + +func solidityType*(_: type StorageRequest): string = + solidityType(StorageRequest.fieldTypes) + +# Note: it seems to be ok to ignore the vbuffer offset for now +func encode*(encoder: var AbiEncoder, cid: Cid) = + encoder.write(cid.data.buffer) + +func encode*(encoder: var AbiEncoder, content: StorageContent) = + encoder.write(content.fieldValues) + +func encode*(encoder: var AbiEncoder, ask: StorageAsk) = + encoder.write(ask.fieldValues) + +func encode*(encoder: var AbiEncoder, id: RequestId | SlotId | Nonce) = + encoder.write(id.toArray) + +func encode*(encoder: var AbiEncoder, request: StorageRequest) = + encoder.write(request.fieldValues) + +func encode*(encoder: var AbiEncoder, slot: Slot) = + encoder.write(slot.fieldValues) + +func decode*(decoder: var AbiDecoder, T: type Cid): ?!T = + let data = ?decoder.read(seq[byte]) + Cid.init(data).mapFailure + +func decode*(decoder: var AbiDecoder, T: type StorageContent): ?!T = + let tupl = ?decoder.read(StorageContent.fieldTypes) + success StorageContent.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type StorageAsk): ?!T = + let tupl = ?decoder.read(StorageAsk.fieldTypes) + success StorageAsk.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type StorageRequest): ?!T = + let tupl = ?decoder.read(StorageRequest.fieldTypes) + success StorageRequest.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type Slot): ?!T = + let tupl = ?decoder.read(Slot.fieldTypes) + success Slot.fromTuple(tupl) + +func id*(request: StorageRequest): RequestId = + let encoding = AbiEncoder.encode((request,)) + RequestId(keccak256.digest(encoding).data) + +func slotId*(requestId: RequestId, slotIndex: uint64): SlotId = + let encoding = AbiEncoder.encode((requestId, slotIndex)) + SlotId(keccak256.digest(encoding).data) + +func slotId*(request: StorageRequest, slotIndex: uint64): SlotId = + slotId(request.id, slotIndex) + +func id*(slot: Slot): SlotId = + slotId(slot.request, slot.slotIndex) + +func pricePerSlotPerSecond*(ask: StorageAsk): UInt256 = + ask.pricePerBytePerSecond * ask.slotSize.u256 + +func pricePerSlot*(ask: StorageAsk): UInt256 = + ask.duration.u256 * ask.pricePerSlotPerSecond + +func totalPrice*(ask: StorageAsk): UInt256 = + ask.slots.u256 * ask.pricePerSlot + +func totalPrice*(request: StorageRequest): UInt256 = + request.ask.totalPrice + +func collateralPerSlot*(ask: StorageAsk): UInt256 = + ask.collateralPerByte * ask.slotSize.u256 + +func size*(ask: StorageAsk): uint64 = + ask.slots * ask.slotSize diff --git a/codexcrawler/services/metrics.nim b/codexcrawler/services/metrics.nim index 39d0b63..020c5aa 100644 --- a/codexcrawler/services/metrics.nim +++ b/codexcrawler/services/metrics.nim @@ -6,6 +6,13 @@ declareGauge(todoNodesGauge, "DHT nodes to be visited") declareGauge(okNodesGauge, "DHT nodes successfully contacted") declareGauge(nokNodesGauge, "DHT nodes failed to contact") +declareGauge(requestsGauge, "Marketplace active storage requests") +declareGauge(pendingGauge, "Marketplace pending storage requests") +declareGauge(requestSlotsGauge, "Marketplace active storage request slots") +declareGauge( + totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests" +) + type OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} @@ -14,6 +21,11 @@ type okNodes: OnUpdateMetric nokNodes: OnUpdateMetric + onRequests: OnUpdateMetric + onPending: OnUpdateMetric + onRequestSlots: OnUpdateMetric + onTotalSize: OnUpdateMetric + proc startServer(metricsAddress: IpAddress, metricsPort: Port) = let metricsAddress = metricsAddress notice "Starting metrics HTTP server", @@ -34,6 +46,18 @@ method setOkNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = m.nokNodes(value.int64) +method setRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = + m.onRequests(value.int64) + +method setPendingRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = + m.onPending(value.int64) + +method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = + m.onRequestSlots(value.int64) + +method setTotalSize*(m: Metrics, value: int64) {.base, gcsafe, raises: [].} = + m.onTotalSize(value.int64) + proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = startServer(metricsAddress, metricsPort) @@ -48,4 +72,24 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = proc onNok(value: int64) = nokNodesGauge.set(value) - return Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok) + proc onRequests(value: int64) = + requestsGauge.set(value) + + proc onPending(value: int64) = + pendingGauge.set(value) + + proc onRequestSlots(value: int64) = + requestSlotsGauge.set(value) + + proc onTotalSize(value: int64) = + totalStorageSizeGauge.set(value) + + return Metrics( + todoNodes: onTodo, + okNodes: onOk, + nokNodes: onNok, + onRequests: onRequests, + onPending: onPending, + onRequestSlots: onRequestSlots, + onTotalSize: onTotalSize, + ) diff --git a/codexcrawler/types.nim b/codexcrawler/types.nim index 9895ed1..5948eb2 100644 --- a/codexcrawler/types.nim +++ b/codexcrawler/types.nim @@ -4,7 +4,13 @@ import pkg/questionable/results import pkg/codexdht import pkg/libp2p -type Nid* = NodeId +import ./services/marketplace/requests + +export requests + +type + Nid* = NodeId + Rid* = requests.RequestId proc `$`*(nid: Nid): string = nid.toHex() @@ -12,6 +18,9 @@ proc `$`*(nid: Nid): string = proc fromStr*(T: type Nid, s: string): Nid = Nid(UInt256.fromHex(s)) +proc fromStr*(T: type Rid, s: string): Rid = + Rid(requests.RequestId.fromHex(s)) + proc toBytes*(nid: Nid): seq[byte] = var buffer = initProtoBuffer() buffer.write(1, $nid) diff --git a/docker/crawler.Dockerfile b/docker/crawler.Dockerfile index 3811fcf..455dfbc 100644 --- a/docker/crawler.Dockerfile +++ b/docker/crawler.Dockerfile @@ -19,6 +19,7 @@ RUN apt-get update && apt-get install -y cmake build-essential WORKDIR ${BUILD_HOME} COPY . . +RUN nimble install nimble RUN nimble build # Create diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh index 9cad0bd..d7b1abc 100644 --- a/docker/docker-entrypoint.sh +++ b/docker/docker-entrypoint.sh @@ -9,13 +9,20 @@ METRICSPORT=${CRAWLER_METRICSPORT:-8008} DATADIR=${CRAWLER_DATADIR:-crawler_data} DISCPORT=${CRAWLER_DISCPORT:-8090} BOOTNODES=${CRAWLER_BOOTNODES:-testnet_sprs} + +DHTENABLE=${CRAWLER_DHTENABLE:-1} STEPDELAY=${CRAWLER_STEPDELAY:-1000} REVISITDELAY=${CRAWLER_REVISITDELAY:-60} CHECKDELAY=${CRAWLER_CHECKDELAY:-10} EXPIRYDELAY=${CRAWLER_EXPIRYDELAY:-1440} +MARKETPLACEENABLE=${CRAWLER_MARKETPLACEENABLE:-1} +ETHPROVIDER=${CRAWLER_ETHPROVIDER:-NULL} +MARKETPLACEADDRESS=${CRAWLER_MARKETPLACEADDRESS:-NULL} +REQUESTCHECKDELAY=${CRAWLER_REQUESTCHECKDELAY:-10} + # Update CLI arguments -set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --stepDelay="${STEPDELAY}" --revisitDelay="${REVISITDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}" +set -- "$@" --logLevel="${LOGLEVEL}" --publicIp="${PUBLICIP}" --metricsAddress="${METRICSADDRESS}" --metricsPort="${METRICSPORT}" --dataDir="${DATADIR}" --discoveryPort="${DISCPORT}" --bootNodes="${BOOTNODES}" --dhtEnable="${DHTENABLE}" --stepDelay="${STEPDELAY}" --revisitDelay="${REVISITDELAY}" --expiryDelay="${EXPIRYDELAY}" --checkDelay="${CHECKDELAY}" --marketplaceEnable="${MARKETPLACEENABLE}" --ethProvider="${ETHPROVIDER}" --marketplaceAddress="${MARKETPLACEADDRESS}" --requestCheckDelay="${REQUESTCHECKDELAY}" # Run echo "Run Codex Crawler" diff --git a/tests/codexcrawler/components/testchaincrawler.nim b/tests/codexcrawler/components/testchaincrawler.nim new file mode 100644 index 0000000..04f98b6 --- /dev/null +++ b/tests/codexcrawler/components/testchaincrawler.nim @@ -0,0 +1,54 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest/chronos/unittest +import std/sequtils +import std/options + +import ../../../codexcrawler/components/chaincrawler +import ../../../codexcrawler/services/marketplace/market +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../mocks/mockrequeststore +import ../mocks/mockmarketplace +import ../helpers + +suite "ChainCrawler": + var + state: MockState + store: MockRequestStore + marketplace: MockMarketplaceService + crawler: ChainCrawler + + setup: + state = createMockState() + store = createMockRequestStore() + marketplace = createMockMarketplaceService() + + crawler = ChainCrawler.new(state, store, marketplace) + (await crawler.start()).tryGet() + + teardown: + state.checkAllUnsubscribed() + + test "start should subscribe to new requests": + check: + marketplace.subNewRequestsCallback.isSome() + + test "new-request subscription should add requestId to store": + let rid = genRid() + (await (marketplace.subNewRequestsCallback.get())(rid)).tryGet() + + check: + store.addRid == rid + + test "start should iterate past requests and add then to store": + check: + marketplace.iterRequestsCallback.isSome() + + let rid = genRid() + (await marketplace.iterRequestsCallback.get()(rid)).tryGet() + + check: + store.addRid == rid diff --git a/tests/codexcrawler/components/testchainmetrics.nim b/tests/codexcrawler/components/testchainmetrics.nim new file mode 100644 index 0000000..be222d2 --- /dev/null +++ b/tests/codexcrawler/components/testchainmetrics.nim @@ -0,0 +1,105 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/chainmetrics +import ../../../codexcrawler/components/requeststore +import ../../../codexcrawler/services/marketplace +import ../../../codexcrawler/types +import ../mocks/mockstate +import ../mocks/mockmetrics +import ../mocks/mockrequeststore +import ../mocks/mockmarketplace +import ../mocks/mockclock +import ../helpers + +suite "ChainMetrics": + var + state: MockState + metrics: MockMetrics + store: MockRequestStore + marketplace: MockMarketplaceService + chain: ChainMetrics + + setup: + state = createMockState() + metrics = createMockMetrics() + store = createMockRequestStore() + marketplace = createMockMarketplaceService() + + chain = ChainMetrics.new(state, metrics, store, marketplace) + (await chain.start()).tryGet() + + teardown: + state.checkAllUnsubscribed() + + proc onStep() {.async.} = + (await state.steppers[0]()).tryGet() + + test "start should start stepper for config.requestCheckDelay minutes": + check: + state.delays.len == 1 + state.delays[0] == state.config.requestCheckDelay.minutes + + test "onStep removes requests from request store when info can't be fetched": + let rid = genRid() + store.iterateEntries.add(RequestEntry(id: rid)) + + marketplace.requestInfoReturns = none(RequestInfo) + + await onStep() + + check: + marketplace.requestInfoRid == rid + store.removeRid == rid + + test "onStep should count the number of active requests": + let rid1 = genRid() + let rid2 = genRid() + store.iterateEntries.add(RequestEntry(id: rid1)) + store.iterateEntries.add(RequestEntry(id: rid2)) + + marketplace.requestInfoReturns = some(RequestInfo()) + + await onStep() + + check: + metrics.requests == 2 + + test "onStep should count the number of pending requests": + let rid1 = genRid() + let rid2 = genRid() + store.iterateEntries.add(RequestEntry(id: rid1)) + store.iterateEntries.add(RequestEntry(id: rid2)) + + marketplace.requestInfoReturns = some(RequestInfo(pending: true)) + + await onStep() + + check: + metrics.pending == 2 + + test "onStep should count the number of active slots": + let rid = genRid() + store.iterateEntries.add(RequestEntry(id: rid)) + + let info = RequestInfo(slots: 123) + marketplace.requestInfoReturns = some(info) + + await onStep() + + check: + metrics.slots == info.slots.int + + test "onStep should count the total size of active slots": + let rid = genRid() + store.iterateEntries.add(RequestEntry(id: rid)) + + let info = RequestInfo(slots: 12, slotSize: 23) + marketplace.requestInfoReturns = some(info) + + await onStep() + + check: + metrics.totalSize == (info.slots * info.slotSize).int diff --git a/tests/codexcrawler/components/testcrawler.nim b/tests/codexcrawler/components/testdhtcrawler.nim similarity index 85% rename from tests/codexcrawler/components/testcrawler.nim rename to tests/codexcrawler/components/testdhtcrawler.nim index cbe9d75..4ae4d21 100644 --- a/tests/codexcrawler/components/testcrawler.nim +++ b/tests/codexcrawler/components/testdhtcrawler.nim @@ -3,7 +3,7 @@ import pkg/questionable import pkg/questionable/results import pkg/asynctest/chronos/unittest -import ../../../codexcrawler/components/crawler +import ../../../codexcrawler/components/dhtcrawler import ../../../codexcrawler/services/dht import ../../../codexcrawler/utils/asyncdataevent import ../../../codexcrawler/types @@ -13,14 +13,14 @@ import ../mocks/mockdht import ../mocks/mocktodolist import ../helpers -suite "Crawler": +suite "DhtCrawler": var nid1: Nid nid2: Nid state: MockState todo: MockTodoList dht: MockDht - crawler: Crawler + crawler: DhtCrawler setup: nid1 = genNid() @@ -29,12 +29,10 @@ suite "Crawler": todo = createMockTodoList() dht = createMockDht() - crawler = Crawler.new(state, dht, todo) - + crawler = DhtCrawler.new(state, dht, todo) (await crawler.start()).tryGet() teardown: - (await crawler.stop()).tryGet() state.checkAllUnsubscribed() proc onStep() {.async.} = @@ -55,6 +53,19 @@ suite "Crawler": check: !(dht.getNeighborsArg) == nid1 + test "onStep is not activated when config.dhtEnable is false": + # Recreate crawler, reset mockstate: + state.steppers = @[] + # disable DHT: + state.config.dhtEnable = false + (await crawler.start()).tryGet() + + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(responsive(nid1)) + + check: + state.steppers.len == 0 + test "nodes returned by getNeighbors are raised as nodesFound": var nodesFound = newSeq[Nid]() proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = diff --git a/tests/codexcrawler/components/testdhtmetrics.nim b/tests/codexcrawler/components/testdhtmetrics.nim index 6475d05..98c48ab 100644 --- a/tests/codexcrawler/components/testdhtmetrics.nim +++ b/tests/codexcrawler/components/testdhtmetrics.nim @@ -28,7 +28,7 @@ suite "DhtMetrics": metrics = createMockMetrics() dhtmetrics = DhtMetrics.new(state, okList, nokList, metrics) - + (await dhtmetrics.awake()).tryGet() (await dhtmetrics.start()).tryGet() teardown: diff --git a/tests/codexcrawler/components/testnodestore.nim b/tests/codexcrawler/components/testnodestore.nim index f2a3a17..9ae8698 100644 --- a/tests/codexcrawler/components/testnodestore.nim +++ b/tests/codexcrawler/components/testnodestore.nim @@ -30,7 +30,6 @@ suite "Nodestore": clock = createMockClock() store = NodeStore.new(state, ds, clock) - (await store.start()).tryGet() teardown: diff --git a/tests/codexcrawler/components/testrequeststore.nim b/tests/codexcrawler/components/testrequeststore.nim new file mode 100644 index 0000000..340826f --- /dev/null +++ b/tests/codexcrawler/components/testrequeststore.nim @@ -0,0 +1,98 @@ +import std/os +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest +import pkg/datastore/typedds + +import ../../../codexcrawler/components/requeststore +import ../../../codexcrawler/utils/datastoreutils +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../helpers + +suite "Requeststore": + let + dsPath = getTempDir() / "testds" + requeststoreName = "requeststore" + + var + ds: TypedDatastore + state: MockState + store: RequestStore + + setup: + ds = createTypedDatastore(dsPath).tryGet() + state = createMockState() + + store = RequestStore.new(state, ds) + + teardown: + (await ds.close()).tryGet() + state.checkAllUnsubscribed() + removeDir(dsPath) + + test "requestEntry encoding": + let entry = RequestEntry(id: genRid()) + + let + bytes = entry.encode() + decoded = RequestEntry.decode(bytes).tryGet() + + check: + entry.id == decoded.id + + test "add stores a new requestId": + let rid = genRid() + (await store.add(rid)).tryGet() + + let + key = Key.init(requeststoreName / $rid).tryGet() + stored = (await get[RequestEntry](ds, key)).tryGet() + + check: + stored.id == rid + + test "remove will remove an entry": + let rid = genRid() + (await store.add(rid)).tryGet() + (await store.remove(rid)).tryGet() + + let + key = Key.init(requeststoreName / $rid).tryGet() + isStored = (await ds.has(key)).tryGet() + + check: + isStored == false + + test "iterateAll yields all entries": + let + rid1 = genRid() + rid2 = genRid() + rid3 = genRid() + + (await store.add(rid1)).tryGet() + (await store.add(rid2)).tryGet() + (await store.add(rid3)).tryGet() + + var entries = newSeq[RequestEntry]() + proc onEntry(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} = + entries.add(entry) + return success() + + (await store.iterateAll(onEntry)).tryGet() + + check: + entries.len == 3 + + let + ids = @[entries[0].id, entries[1].id, entries[2].id] + all = @[rid1, rid2, rid3] + + for id in ids: + check: + id in all + + for id in all: + check: + id in ids diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim index 0c9b4cd..1e405ad 100644 --- a/tests/codexcrawler/components/testtimetracker.nim +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -47,11 +47,9 @@ suite "TimeTracker": state.config.expiryDelayMins = 22 time = TimeTracker.new(state, store, dht, clock) - (await time.start()).tryGet() teardown: - (await time.stop()).tryGet() await state.events.nodesToRevisit.unsubscribe(sub) state.checkAllUnsubscribed() diff --git a/tests/codexcrawler/components/testtodolist.nim b/tests/codexcrawler/components/testtodolist.nim index cca8512..b861f7b 100644 --- a/tests/codexcrawler/components/testtodolist.nim +++ b/tests/codexcrawler/components/testtodolist.nim @@ -23,8 +23,7 @@ suite "TodoList": metrics = createMockMetrics() todo = TodoList.new(state, metrics) - - (await todo.start()).tryGet() + (await todo.awake()).tryGet() teardown: (await todo.stop()).tryGet() diff --git a/tests/codexcrawler/helpers.nim b/tests/codexcrawler/helpers.nim index 5feb345..1afb6a8 100644 --- a/tests/codexcrawler/helpers.nim +++ b/tests/codexcrawler/helpers.nim @@ -1,6 +1,25 @@ import std/random +import std/typetraits import pkg/stint +import pkg/stew/byteutils import ../../codexcrawler/types +proc example*[T: SomeInteger](_: type T): T = + rand(T) + +proc example*[T, N](_: type array[N, T]): array[N, T] = + for item in result.mitems: + item = T.example + +proc example*(_: type UInt256): UInt256 = + UInt256.fromBytes(array[32, byte].example) + +proc example*[T: distinct](_: type T): T = + type baseType = T.distinctBase + T(baseType.example) + proc genNid*(): Nid = Nid(rand(uint64).u256) + +proc genRid*(): Rid = + Rid(array[32, byte].example) diff --git a/tests/codexcrawler/mocks/mockclock.nim b/tests/codexcrawler/mocks/mockclock.nim index 5c5cf44..d4621bc 100644 --- a/tests/codexcrawler/mocks/mockclock.nim +++ b/tests/codexcrawler/mocks/mockclock.nim @@ -7,4 +7,4 @@ method now*(clock: MockClock): uint64 {.raises: [].} = clock.setNow proc createMockClock*(): MockClock = - MockClock() + MockClock(setNow: 12) diff --git a/tests/codexcrawler/mocks/mockdht.nim b/tests/codexcrawler/mocks/mockdht.nim index 4c12f33..59c6d0b 100644 --- a/tests/codexcrawler/mocks/mockdht.nim +++ b/tests/codexcrawler/mocks/mockdht.nim @@ -18,11 +18,5 @@ method getNeighbors*( d.getNeighborsArg = some(target) return d.getNeighborsReturn -method start*(d: MockDht): Future[?!void] {.async.} = - return success() - -method stop*(d: MockDht): Future[?!void] {.async.} = - return success() - proc createMockDht*(): MockDht = MockDht() diff --git a/tests/codexcrawler/mocks/mockmarketplace.nim b/tests/codexcrawler/mocks/mockmarketplace.nim new file mode 100644 index 0000000..97d0ef1 --- /dev/null +++ b/tests/codexcrawler/mocks/mockmarketplace.nim @@ -0,0 +1,37 @@ +import pkg/ethers +import pkg/questionable + +import ../../../codexcrawler/services/marketplace +import ../../../codexcrawler/types + +logScope: + topics = "marketplace" + +type MockMarketplaceService* = ref object of MarketplaceService + subNewRequestsCallback*: ?OnNewRequest + iterRequestsCallback*: ?OnNewRequest + requestInfoReturns*: ?RequestInfo + requestInfoRid*: Rid + +method subscribeToNewRequests*( + m: MockMarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []).} = + m.subNewRequestsCallback = some(onNewRequest) + return success() + +method iteratePastNewRequestEvents*( + m: MockMarketplaceService, onNewRequest: OnNewRequest +): Future[?!void] {.async: (raises: []).} = + m.iterRequestsCallback = some(onNewRequest) + return success() + +method getRequestInfo*( + m: MockMarketplaceService, rid: Rid +): Future[?RequestInfo] {.async: (raises: []).} = + m.requestInfoRid = rid + return m.requestInfoReturns + +proc createMockMarketplaceService*(): MockMarketplaceService = + MockMarketplaceService( + subNewRequestsCallback: none(OnNewRequest), iterRequestsCallback: none(OnNewRequest) + ) diff --git a/tests/codexcrawler/mocks/mockmetrics.nim b/tests/codexcrawler/mocks/mockmetrics.nim index 8152e76..29c61e9 100644 --- a/tests/codexcrawler/mocks/mockmetrics.nim +++ b/tests/codexcrawler/mocks/mockmetrics.nim @@ -4,6 +4,10 @@ type MockMetrics* = ref object of Metrics todo*: int ok*: int nok*: int + requests*: int + pending*: int + slots*: int + totalSize*: int64 method setTodoNodes*(m: MockMetrics, value: int) = m.todo = value @@ -14,5 +18,17 @@ method setOkNodes*(m: MockMetrics, value: int) = method setNokNodes*(m: MockMetrics, value: int) = m.nok = value +method setRequests*(m: MockMetrics, value: int) = + m.requests = value + +method setPendingRequests*(m: MockMetrics, value: int) = + m.pending = value + +method setRequestSlots*(m: MockMetrics, value: int) = + m.slots = value + +method setTotalSize*(m: MockMetrics, value: int64) = + m.totalSize = value + proc createMockMetrics*(): MockMetrics = MockMetrics() diff --git a/tests/codexcrawler/mocks/mocknodestore.nim b/tests/codexcrawler/mocks/mocknodestore.nim index ed78f31..d596565 100644 --- a/tests/codexcrawler/mocks/mocknodestore.nim +++ b/tests/codexcrawler/mocks/mocknodestore.nim @@ -22,11 +22,5 @@ method deleteEntries*( s.nodesToDelete = nids return success() -method start*(s: MockNodeStore): Future[?!void] {.async.} = - return success() - -method stop*(s: MockNodeStore): Future[?!void] {.async.} = - return success() - proc createMockNodeStore*(): MockNodeStore = MockNodeStore(nodesToIterate: newSeq[NodeEntry](), nodesToDelete: newSeq[Nid]()) diff --git a/tests/codexcrawler/mocks/mockrequeststore.nim b/tests/codexcrawler/mocks/mockrequeststore.nim new file mode 100644 index 0000000..e17606e --- /dev/null +++ b/tests/codexcrawler/mocks/mockrequeststore.nim @@ -0,0 +1,28 @@ +import pkg/questionable/results +import pkg/chronos + +import ../../../codexcrawler/components/requeststore +import ../../../codexcrawler/types + +type MockRequestStore* = ref object of RequestStore + addRid*: Rid + removeRid*: Rid + iterateEntries*: seq[RequestEntry] + +method add*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} = + s.addRid = rid + return success() + +method remove*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} = + s.removeRid = rid + return success() + +method iterateAll*( + s: MockRequestStore, onNode: OnRequestEntry +): Future[?!void] {.async: (raises: []).} = + for entry in s.iterateEntries: + ?await onNode(entry) + return success() + +proc createMockRequestStore*(): MockRequestStore = + MockRequestStore(iterateEntries: newSeq[RequestEntry]()) diff --git a/tests/codexcrawler/mocks/mockstate.nim b/tests/codexcrawler/mocks/mockstate.nim index 00e1198..d805f0e 100644 --- a/tests/codexcrawler/mocks/mockstate.nim +++ b/tests/codexcrawler/mocks/mockstate.nim @@ -22,7 +22,7 @@ method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} = proc createMockState*(): MockState = MockState( status: ApplicationStatus.Running, - config: Config(), + config: Config(dhtEnable: true, marketplaceEnable: true, requestCheckDelay: 4), events: Events( nodesFound: newAsyncDataEvent[seq[Nid]](), newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), diff --git a/tests/codexcrawler/mocks/mocktodolist.nim b/tests/codexcrawler/mocks/mocktodolist.nim index 1417ae4..e7b11cd 100644 --- a/tests/codexcrawler/mocks/mocktodolist.nim +++ b/tests/codexcrawler/mocks/mocktodolist.nim @@ -10,11 +10,5 @@ type MockTodoList* = ref object of TodoList method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} = return t.popReturn -method start*(t: MockTodoList): Future[?!void] {.async.} = - return success() - -method stop*(t: MockTodoList): Future[?!void] {.async.} = - return success() - proc createMockTodoList*(): MockTodoList = MockTodoList() diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim index 0773ec0..bf3b66b 100644 --- a/tests/codexcrawler/testcomponents.nim +++ b/tests/codexcrawler/testcomponents.nim @@ -1,7 +1,10 @@ -import ./components/testnodestore +import ./components/testchaincrawler +import ./components/testchainmetrics +import ./components/testdhtcrawler import ./components/testdhtmetrics -import ./components/testtodolist +import ./components/testnodestore +import ./components/testrequeststore import ./components/testtimetracker -import ./components/testcrawler +import ./components/testtodolist {.warning[UnusedImport]: off.}