From 15303125f6cc3411283509064cca91203dc9886f Mon Sep 17 00:00:00 2001 From: Arnaud Date: Fri, 16 Aug 2024 01:57:50 +0200 Subject: [PATCH 1/4] Support CORS preflight requests when the storage request api returns an error (#878) * Add CORS headers when the REST API is returning an error * Use the allowedOrigin instead of the wilcard when setting the origin Signed-off-by: Arnaud --------- Signed-off-by: Arnaud --- codex/rest/api.nim | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index b209e7c9..8197d8d4 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -418,6 +418,8 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = return RestApiResponse.error(Http500) proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) = + let allowedOrigin = router.allowedOrigin + router.rawApi( MethodPost, "/api/codex/v1/storage/request/{cid}") do (cid: Cid) -> RestApiResponse: @@ -432,37 +434,44 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) = ## tolerance - allowed number of nodes that can be lost before content is lost ## colateral - requested collateral from hosts when they fill slot + var headers = newSeq[(string,string)]() + + if corsOrigin =? allowedOrigin: + headers.add(("Access-Control-Allow-Origin", corsOrigin)) + headers.add(("Access-Control-Allow-Methods", "POST, OPTIONS")) + headers.add(("Access-Control-Max-Age", "86400")) + try: without contracts =? node.contracts.client: - return RestApiResponse.error(Http503, "Purchasing unavailable") + return RestApiResponse.error(Http503, "Purchasing unavailable", headers = headers) without cid =? cid.tryGet.catch, error: - return RestApiResponse.error(Http400, error.msg) + return RestApiResponse.error(Http400, error.msg, headers = headers) let body = await request.getBody() without params =? StorageRequestParams.fromJson(body), error: - return RestApiResponse.error(Http400, error.msg) + return RestApiResponse.error(Http400, error.msg, headers = headers) let nodes = params.nodes |? 1 let tolerance = params.tolerance |? 0 # prevent underflow if tolerance > nodes: - return RestApiResponse.error(Http400, "Invalid parameters: `tolerance` cannot be greater than `nodes`") + return RestApiResponse.error(Http400, "Invalid parameters: `tolerance` cannot be greater than `nodes`", headers = headers) let ecK = nodes - tolerance let ecM = tolerance # for readability # ensure leopard constrainst of 1 < K ≥ M if ecK <= 1 or ecK < ecM: - return RestApiResponse.error(Http400, "Invalid parameters: parameters must satify `1 < (nodes - tolerance) ≥ tolerance`") + return RestApiResponse.error(Http400, "Invalid parameters: parameters must satify `1 < (nodes - tolerance) ≥ tolerance`", headers = headers) without expiry =? params.expiry: - return RestApiResponse.error(Http400, "Expiry required") + return RestApiResponse.error(Http400, "Expiry required", headers = headers) if expiry <= 0 or expiry >= params.duration: - return RestApiResponse.error(Http400, "Expiry needs value bigger then zero and smaller then the request's duration") + return RestApiResponse.error(Http400, "Expiry needs value bigger then zero and smaller then the request's duration", headers = headers) without purchaseId =? await node.requestStorage( cid, @@ -477,14 +486,14 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) = if error of InsufficientBlocksError: return RestApiResponse.error(Http400, "Dataset too small for erasure parameters, need at least " & - $(ref InsufficientBlocksError)(error).minSize.int & " bytes") + $(ref InsufficientBlocksError)(error).minSize.int & " bytes", headers = headers) - return RestApiResponse.error(Http500, error.msg) + return RestApiResponse.error(Http500, error.msg, headers = headers) return RestApiResponse.response(purchaseId.toHex) except CatchableError as exc: trace "Excepting processing request", exc = exc.msg - return RestApiResponse.error(Http500) + return RestApiResponse.error(Http500, headers = headers) router.api( MethodGet, From 63ee4ca0f9e6a3043dbdafc2647c92cfa41e508a Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Fri, 16 Aug 2024 04:55:25 +0200 Subject: [PATCH 2/4] refactor(marketplace): generic querying of historical marketplace events (#872) * refactor(marketplace): move marketplace events to the Market abstraction Move marketplace contract events to the Market abstraction so the types can be shared across all modules that call the Market abstraction. * Remove unneeded conversion * Switch to generic implementation of event querying * change parent type to MarketplaceEvent --- codex/contracts/market.nim | 21 ++++++++----------- codex/contracts/marketplace.nim | 19 ----------------- codex/market.nim | 28 ++++++++++++++++++++----- tests/codex/helpers/mockmarket.nim | 21 +++++++++++-------- tests/contracts/testMarket.nim | 33 ++++++++++++++++++++++-------- 5 files changed, 69 insertions(+), 53 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index c874d5db..3e9cfa0b 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -347,9 +347,11 @@ method subscribeProofSubmission*(market: OnChainMarket, method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = await subscription.eventSubscription.unsubscribe() -method queryPastStorageRequests*(market: OnChainMarket, - blocksAgo: int): - Future[seq[PastStorageRequest]] {.async.} = +method queryPastEvents*[T: MarketplaceEvent]( + market: OnChainMarket, + _: type T, + blocksAgo: int): Future[seq[T]] {.async.} = + convertEthersError: let contract = market.contract let provider = contract.provider @@ -357,13 +359,6 @@ method queryPastStorageRequests*(market: OnChainMarket, let head = await provider.getBlockNumber() let fromBlock = BlockTag.init(head - blocksAgo.abs.u256) - let events = await contract.queryFilter(StorageRequested, - fromBlock, - BlockTag.latest) - return events.map(event => - PastStorageRequest( - requestId: event.requestId, - ask: event.ask, - expiry: event.expiry - ) - ) + return await contract.queryFilter(T, + fromBlock, + BlockTag.latest) diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index 301f8c25..03001a79 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -16,25 +16,6 @@ export requests type Marketplace* = ref object of Contract - StorageRequested* = object of Event - requestId*: RequestId - ask*: StorageAsk - expiry*: UInt256 - SlotFilled* = object of Event - requestId* {.indexed.}: RequestId - slotIndex*: UInt256 - SlotFreed* = object of Event - requestId* {.indexed.}: RequestId - slotIndex*: UInt256 - RequestFulfilled* = object of Event - requestId* {.indexed.}: RequestId - RequestCancelled* = object of Event - requestId* {.indexed.}: RequestId - RequestFailed* = object of Event - requestId* {.indexed.}: RequestId - ProofSubmitted* = object of Event - id*: SlotId - proc config*(marketplace: Marketplace): MarketplaceConfig {.contract, view.} proc token*(marketplace: Marketplace): Address {.contract, view.} diff --git a/codex/market.nim b/codex/market.nim index b521c395..245c28d5 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -28,11 +28,28 @@ type OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises:[].} - PastStorageRequest* = object + ProofChallenge* = array[32, byte] + + # Marketplace events -- located here due to the Market abstraction + MarketplaceEvent* = Event + StorageRequested* = object of MarketplaceEvent requestId*: RequestId ask*: StorageAsk expiry*: UInt256 - ProofChallenge* = array[32, byte] + SlotFilled* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + slotIndex*: UInt256 + SlotFreed* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + slotIndex*: UInt256 + RequestFulfilled* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + RequestCancelled* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + RequestFailed* = object of MarketplaceEvent + requestId* {.indexed.}: RequestId + ProofSubmitted* = object of MarketplaceEvent + id*: SlotId method getZkeyHash*(market: Market): Future[?string] {.base, async.} = raiseAssert("not implemented") @@ -202,7 +219,8 @@ method subscribeProofSubmission*(market: Market, method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") -method queryPastStorageRequests*(market: Market, - blocksAgo: int): - Future[seq[PastStorageRequest]] {.base, async.} = +method queryPastEvents*[T: MarketplaceEvent]( + market: Market, + _: type T, + blocksAgo: int): Future[seq[T]] {.base, async.} = raiseAssert("not implemented") diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 30697f51..ed460adb 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -420,16 +420,21 @@ method subscribeProofSubmission*(mock: MockMarket, mock.subscriptions.onProofSubmitted.add(subscription) return subscription -method queryPastStorageRequests*(market: MockMarket, - blocksAgo: int): - Future[seq[PastStorageRequest]] {.async.} = - # MockMarket does not have the concept of blocks, so simply return all - # previous events - return market.requested.map(request => - PastStorageRequest(requestId: request.id, +method queryPastEvents*[T: MarketplaceEvent]( + market: MockMarket, + _: type T, + blocksAgo: int): Future[seq[T]] {.async.} = + + if T of StorageRequested: + return market.requested.map(request => + StorageRequested(requestId: request.id, ask: request.ask, expiry: request.expiry) - ) + ) + elif T of SlotFilled: + return market.filled.map(slot => + SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex) + ) method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index 35b46279..bb01aa77 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -324,7 +324,7 @@ ethersuite "On-Chain Market": let slotId = request.slotId(slotIndex) check (await market.slotState(slotId)) == SlotState.Filled - test "can query past events": + test "can query past StorageRequested events": var request1 = StorageRequest.example var request2 = StorageRequest.example request1.client = accounts[0] @@ -335,21 +335,38 @@ ethersuite "On-Chain Market": # `market.requestStorage` executes an `approve` tx before the # `requestStorage` tx, so that's two PoA blocks per `requestStorage` call (6 - # blocks for 3 calls). `fromBlock` and `toBlock` are inclusive, so to check - # 6 blocks, we only need to check 5 "blocks ago". We don't need to check the - # `approve` for the first `requestStorage` call, so that's 1 less again = 4 - # "blocks ago". + # blocks for 3 calls). We don't need to check the `approve` for the first + # `requestStorage` call, so we only need to check 5 "blocks ago". "blocks + # ago". proc getsPastRequest(): Future[bool] {.async.} = - let reqs = await market.queryPastStorageRequests(5) + let reqs = await market.queryPastEvents(StorageRequested, 5) reqs.mapIt(it.requestId) == @[request.id, request1.id, request2.id] check eventually await getsPastRequest() + test "can query past SlotFilled events": + await market.requestStorage(request) + await market.fillSlot(request.id, 0.u256, proof, request.ask.collateral) + await market.fillSlot(request.id, 1.u256, proof, request.ask.collateral) + await market.fillSlot(request.id, 2.u256, proof, request.ask.collateral) + let slotId = request.slotId(slotIndex) + + # `market.fill` executes an `approve` tx before the `fillSlot` tx, so that's + # two PoA blocks per `fillSlot` call (6 blocks for 3 calls). We don't need + # to check the `approve` for the first `fillSlot` call, so we only need to + # check 5 "blocks ago". + let events = await market.queryPastEvents(SlotFilled, 5) + check events == @[ + SlotFilled(requestId: request.id, slotIndex: 0.u256), + SlotFilled(requestId: request.id, slotIndex: 1.u256), + SlotFilled(requestId: request.id, slotIndex: 2.u256), + ] + test "past event query can specify negative `blocksAgo` parameter": await market.requestStorage(request) check eventually ( - (await market.queryPastStorageRequests(blocksAgo = -2)) == - (await market.queryPastStorageRequests(blocksAgo = 2)) + (await market.queryPastEvents(StorageRequested, blocksAgo = -2)) == + (await market.queryPastEvents(StorageRequested, blocksAgo = 2)) ) From e017b05cf1b4d0bc025f5931c951aa0a5ba08dd1 Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Mon, 19 Aug 2024 05:48:03 -0400 Subject: [PATCH 3/4] Remove extra license file (#876) * remove extra license * center "apache license" --- LICENSE-APACHE | 201 ----------------------------------------------- LICENSE-APACHEv2 | 2 +- 2 files changed, 1 insertion(+), 202 deletions(-) delete mode 100644 LICENSE-APACHE diff --git a/LICENSE-APACHE b/LICENSE-APACHE deleted file mode 100644 index 1f70e331..00000000 --- a/LICENSE-APACHE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2024 Codex Storage - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/LICENSE-APACHEv2 b/LICENSE-APACHEv2 index fdfb0bd3..cbd5c758 100644 --- a/LICENSE-APACHEv2 +++ b/LICENSE-APACHEv2 @@ -1,4 +1,4 @@ -Apache License + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ From 1e2ad956591d41ec93ede1dec7da228a4558e9d3 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:18:59 +0200 Subject: [PATCH 4/4] Update advertising (#862) * Setting up advertiser * Wires up advertiser * cleanup * test compiles * tests pass * setting up test for advertiser * Finishes advertiser tests * fixes commonstore tests * Review comments by Giuliano * Race condition found by Giuliano * Review comment by Dmitriy Co-authored-by: Dmitriy Ryajov Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> * fixes tests --------- Signed-off-by: Ben Bierens <39762930+benbierens@users.noreply.github.com> Co-authored-by: Dmitriy Ryajov --- codex/blockexchange/engine.nim | 3 +- codex/blockexchange/engine/advertiser.nim | 177 ++++++++++++++++++ codex/blockexchange/engine/discovery.nim | 110 +---------- codex/blockexchange/engine/engine.nim | 22 +-- codex/codex.nim | 3 +- codex/node.nim | 3 - codex/stores/blockstore.nim | 2 + codex/stores/cachestore.nim | 6 +- codex/stores/repostore/store.nim | 3 + codex/stores/repostore/types.nim | 4 +- .../blockexchange/discovery/testdiscovery.nim | 13 ++ .../discovery/testdiscoveryengine.nim | 57 ------ .../blockexchange/engine/testadvertiser.nim | 106 +++++++++++ .../codex/blockexchange/engine/testengine.nim | 71 +++---- tests/codex/blockexchange/testengine.nim | 1 + tests/codex/helpers/nodeutils.nim | 3 +- tests/codex/node/helpers.nim | 6 +- tests/codex/node/testnode.nim | 3 + tests/codex/stores/commonstoretests.nim | 11 ++ 19 files changed, 369 insertions(+), 235 deletions(-) create mode 100644 codex/blockexchange/engine/advertiser.nim create mode 100644 tests/codex/blockexchange/engine/testadvertiser.nim diff --git a/codex/blockexchange/engine.nim b/codex/blockexchange/engine.nim index a9cd9160..5aeb96ba 100644 --- a/codex/blockexchange/engine.nim +++ b/codex/blockexchange/engine.nim @@ -1,5 +1,6 @@ import ./engine/discovery +import ./engine/advertiser import ./engine/engine import ./engine/payments -export discovery, engine, payments +export discovery, advertiser, engine, payments diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim new file mode 100644 index 00000000..0b59d150 --- /dev/null +++ b/codex/blockexchange/engine/advertiser.nim @@ -0,0 +1,177 @@ +## Nim-Codex +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/libp2p/cid +import pkg/libp2p/multicodec +import pkg/metrics +import pkg/questionable +import pkg/questionable/results + +import ../protobuf/presence +import ../peers + +import ../../utils +import ../../discovery +import ../../stores/blockstore +import ../../logutils +import ../../manifest + +logScope: + topics = "codex discoveryengine advertiser" + +declareGauge(codexInflightAdvertise, "inflight advertise requests") + +const + DefaultConcurrentAdvertRequests = 10 + DefaultAdvertiseLoopSleep = 30.minutes + +type + Advertiser* = ref object of RootObj + localStore*: BlockStore # Local block store for this instance + discovery*: Discovery # Discovery interface + + advertiserRunning*: bool # Indicates if discovery is running + concurrentAdvReqs: int # Concurrent advertise requests + + advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle + advertiseQueue*: AsyncQueue[Cid] # Advertise queue + advertiseTasks*: seq[Future[void]] # Advertise tasks + + advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep + inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests + +proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} = + if cid notin b.advertiseQueue: + await b.advertiseQueue.put(cid) + trace "Advertising", cid + +proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} = + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is manifest" + return + + if isM: + without blk =? await b.localStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return + + without manifest =? Manifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + + # announce manifest cid and tree cid + await b.addCidToQueue(cid) + await b.addCidToQueue(manifest.treeCid) + +proc advertiseLocalStoreLoop(b: Advertiser) {.async.} = + while b.advertiserRunning: + if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cids: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." + + await sleepAsync(b.advertiseLocalStoreLoopSleep) + + info "Exiting advertise task loop" + +proc processQueueLoop(b: Advertiser) {.async.} = + while b.advertiserRunning: + try: + let + cid = await b.advertiseQueue.get() + + if cid in b.inFlightAdvReqs: + continue + + try: + let + request = b.discovery.provide(cid) + + b.inFlightAdvReqs[cid] = request + codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) + await request + + finally: + b.inFlightAdvReqs.del(cid) + codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) + except CancelledError: + trace "Advertise task cancelled" + return + except CatchableError as exc: + warn "Exception in advertise task runner", exc = exc.msg + + info "Exiting advertise task runner" + +proc start*(b: Advertiser) {.async.} = + ## Start the advertiser + ## + + trace "Advertiser start" + + proc onBlock(cid: Cid) {.async.} = + await b.advertiseBlock(cid) + + doAssert(b.localStore.onBlockStored.isNone()) + b.localStore.onBlockStored = onBlock.some + + if b.advertiserRunning: + warn "Starting advertiser twice" + return + + b.advertiserRunning = true + for i in 0.. 0: warn "Failed to send block request cancellations to peers", peers = failed.len -proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = - var cids = initHashSet[Cid]() - for bd in blocksDelivery: - if bd.address.leaf: - cids.incl(bd.address.treeCid) - else: - without isM =? bd.address.cid.isManifest, err: - warn "Unable to determine if cid is manifest" - continue - if isM: - cids.incl(bd.address.cid) - return cids.toSeq - proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = b.pendingBlocks.resolve(blocksDelivery) await b.scheduleTasks(blocksDelivery) - let announceCids = getAnnouceCids(blocksDelivery) await b.cancelBlocks(blocksDelivery.mapIt(it.address)) - b.discovery.queueProvideBlocksReq(announceCids) - proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = await b.resolveBlocks( blocks.mapIt( @@ -596,6 +584,7 @@ proc new*( wallet: WalletRef, network: BlockExcNetwork, discovery: DiscoveryEngine, + advertiser: Advertiser, peerStore: PeerCtxStore, pendingBlocks: PendingBlocksManager, concurrentTasks = DefaultConcurrentTasks, @@ -616,6 +605,7 @@ proc new*( concurrentTasks: concurrentTasks, taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), discovery: discovery, + advertiser: advertiser, blockFetchTimeout: blockFetchTimeout) proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = diff --git a/codex/codex.nim b/codex/codex.nim index 0b9182fb..6d98a562 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -268,8 +268,9 @@ proc new*( peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() + advertiser = Advertiser.new(repoStore, discovery) blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) - engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) + engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks) store = NetworkStore.new(engine, repoStore) prover = if config.prover: if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and diff --git a/codex/node.nim b/codex/node.nim index 19065c99..88305a08 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -366,9 +366,6 @@ proc store*( blocks = manifest.blocksCount, datasetSize = manifest.datasetSize - await self.discovery.provide(manifestBlk.cid) - await self.discovery.provide(treeCid) - return manifestBlk.cid.success proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 4921bebb..52f37517 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -29,7 +29,9 @@ type BlockType* {.pure.} = enum Manifest, Block, Both + CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, raises:[].} BlockStore* = ref object of RootObj + onBlockStored*: ?CidCallback method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = ## Get a block from the blockstore diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index d6623373..130d2ade 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -197,6 +197,9 @@ method putBlock*( return success() discard self.putBlockSync(blk) + if onBlock =? self.onBlockStored: + await onBlock(blk.cid) + return success() method putCidAndProof*( @@ -282,7 +285,8 @@ proc new*( cache: cache, cidAndProofCache: cidAndProofCache, currentSize: currentSize, - size: cacheSize) + size: cacheSize, + onBlockStored: CidCallback.none) for blk in blocks: discard store.putBlockSync(blk) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 7d629131..5ff99e64 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -189,6 +189,9 @@ method putBlock*( if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: return failure(err) + + if onBlock =? self.onBlockStored: + await onBlock(blk.cid) else: trace "Block already exists" diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim index 4338e63a..2f88183d 100644 --- a/codex/stores/repostore/types.nim +++ b/codex/stores/repostore/types.nim @@ -11,6 +11,7 @@ import pkg/chronos import pkg/datastore import pkg/datastore/typedds import pkg/libp2p/cid +import pkg/questionable import ../blockstore import ../../clock @@ -103,5 +104,6 @@ func new*( clock: clock, postFixLen: postFixLen, quotaMaxBytes: quotaMaxBytes, - blockTtl: blockTtl + blockTtl: blockTtl, + onBlockStored: CidCallback.none ) diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 9ba29e5d..a136f89e 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -32,6 +32,7 @@ asyncchecksuite "Block Advertising and Discovery": peerStore: PeerCtxStore blockDiscovery: MockDiscovery discovery: DiscoveryEngine + advertiser: Advertiser wallet: WalletRef network: BlockExcNetwork localStore: CacheStore @@ -68,11 +69,17 @@ asyncchecksuite "Block Advertising and Discovery": pendingBlocks, minPeersPerBlock = 1) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -200,11 +207,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": pendingBlocks, minPeersPerBlock = 1) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) networkStore = NetworkStore.new(engine, localStore) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index ff6b60d2..61a6f09f 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -74,30 +74,6 @@ asyncchecksuite "Test Discovery Engine": await allFuturesThrowing(allFinished(wants)).wait(1.seconds) await discoveryEngine.stop() - test "Should Advertise Haves": - var - localStore = CacheStore.new(blocks.mapIt(it)) - discoveryEngine = DiscoveryEngine.new( - localStore, - peerStore, - network, - blockDiscovery, - pendingBlocks, - discoveryLoopSleep = 100.millis) - haves = collect(initTable): - for cid in @[manifestBlock.cid, manifest.treeCid]: - { cid: newFuture[void]() } - - blockDiscovery.publishBlockProvideHandler = - proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = - if not haves[cid].finished: - haves[cid].complete - - await discoveryEngine.start() - await allFuturesThrowing( - allFinished(toSeq(haves.values))).wait(5.seconds) - await discoveryEngine.stop() - test "Should queue discovery request": var localStore = CacheStore.new() @@ -191,36 +167,3 @@ asyncchecksuite "Test Discovery Engine": reqs.complete() await discoveryEngine.stop() - - test "Should not request if there is already an inflight advertise request": - var - localStore = CacheStore.new() - discoveryEngine = DiscoveryEngine.new( - localStore, - peerStore, - network, - blockDiscovery, - pendingBlocks, - discoveryLoopSleep = 100.millis, - concurrentAdvReqs = 2) - reqs = newFuture[void]() - count = 0 - - blockDiscovery.publishBlockProvideHandler = - proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = - check cid == blocks[0].cid - if count > 0: - check false - count.inc - - await reqs # queue the request - - await discoveryEngine.start() - discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid]) - await sleepAsync(200.millis) - - discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid]) - await sleepAsync(200.millis) - - reqs.complete() - await discoveryEngine.stop() diff --git a/tests/codex/blockexchange/engine/testadvertiser.nim b/tests/codex/blockexchange/engine/testadvertiser.nim new file mode 100644 index 00000000..4cbd2117 --- /dev/null +++ b/tests/codex/blockexchange/engine/testadvertiser.nim @@ -0,0 +1,106 @@ +import std/sequtils +import std/random + +import pkg/chronos +import pkg/libp2p/routing_record +import pkg/codexdht/discv5/protocol as discv5 + +import pkg/codex/blockexchange +import pkg/codex/stores +import pkg/codex/chunker +import pkg/codex/discovery +import pkg/codex/blocktype as bt +import pkg/codex/manifest + +import ../../../asynctest +import ../../helpers +import ../../helpers/mockdiscovery +import ../../examples + +asyncchecksuite "Advertiser": + var + blockDiscovery: MockDiscovery + localStore: BlockStore + advertiser: Advertiser + let + manifest = Manifest.new( + treeCid = Cid.example, + blockSize = 123.NBytes, + datasetSize = 234.NBytes) + manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + + setup: + blockDiscovery = MockDiscovery.new() + localStore = CacheStore.new() + + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + + await advertiser.start() + + teardown: + await advertiser.stop() + + test "blockStored should queue manifest Cid for advertising": + (await localStore.putBlock(manifestBlk)).tryGet() + + check: + manifestBlk.cid in advertiser.advertiseQueue + + test "blockStored should queue tree Cid for advertising": + (await localStore.putBlock(manifestBlk)).tryGet() + + check: + manifest.treeCid in advertiser.advertiseQueue + + test "blockStored should not queue non-manifest non-tree CIDs for discovery": + let blk = bt.Block.example + + (await localStore.putBlock(blk)).tryGet() + + check: + blk.cid notin advertiser.advertiseQueue + + test "Should not queue if there is already an inflight advertise request": + var + reqs = newFuture[void]() + manifestCount = 0 + treeCount = 0 + + blockDiscovery.publishBlockProvideHandler = + proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = + if cid == manifestBlk.cid: + inc manifestCount + if cid == manifest.treeCid: + inc treeCount + + await reqs # queue the request + + (await localStore.putBlock(manifestBlk)).tryGet() + (await localStore.putBlock(manifestBlk)).tryGet() + + reqs.complete() + check eventually manifestCount == 1 + check eventually treeCount == 1 + + test "Should advertise existing manifests and their trees": + let + newStore = CacheStore.new([manifestBlk]) + + await advertiser.stop() + advertiser = Advertiser.new( + newStore, + blockDiscovery + ) + await advertiser.start() + + check eventually manifestBlk.cid in advertiser.advertiseQueue + check eventually manifest.treeCid in advertiser.advertiseQueue + + test "Stop should clear onBlockStored callback": + await advertiser.stop() + + check: + localStore.onBlockStored.isNone() diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 5bf02b1b..c22a1a6a 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -78,11 +78,17 @@ asyncchecksuite "NetworkStore engine basic": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -113,11 +119,17 @@ asyncchecksuite "NetworkStore engine basic": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -139,6 +151,7 @@ asyncchecksuite "NetworkStore engine handlers": network: BlockExcNetwork engine: BlockExcEngine discovery: DiscoveryEngine + advertiser: Advertiser peerCtx: BlockExcPeerCtx localStore: BlockStore blocks: seq[Block] @@ -176,11 +189,17 @@ asyncchecksuite "NetworkStore engine handlers": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) @@ -390,51 +409,6 @@ asyncchecksuite "NetworkStore engine handlers": discard await allFinished(pending) await allFuturesThrowing(cancellations.values().toSeq) - test "resolveBlocks should queue manifest CIDs for discovery": - engine.network = BlockExcNetwork( - request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) - - let - manifest = Manifest.new( - treeCid = Cid.example, - blockSize = 123.NBytes, - datasetSize = 234.NBytes - ) - - let manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - let blks = @[manifestBlk] - - await engine.resolveBlocks(blks) - - check: - manifestBlk.cid in engine.discovery.advertiseQueue - - test "resolveBlocks should queue tree CIDs for discovery": - engine.network = BlockExcNetwork( - request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) - - let - tCid = Cid.example - delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: true, treeCid: tCid)) - - await engine.resolveBlocks(@[delivery]) - - check: - tCid in engine.discovery.advertiseQueue - - test "resolveBlocks should not queue non-manifest non-tree CIDs for discovery": - engine.network = BlockExcNetwork( - request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) - - let - blkCid = Cid.example - delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: false, cid: blkCid)) - - await engine.resolveBlocks(@[delivery]) - - check: - blkCid notin engine.discovery.advertiseQueue - asyncchecksuite "Task Handler": var rng: Rng @@ -448,6 +422,7 @@ asyncchecksuite "Task Handler": network: BlockExcNetwork engine: BlockExcEngine discovery: DiscoveryEngine + advertiser: Advertiser localStore: BlockStore peersCtx: seq[BlockExcPeerCtx] @@ -481,11 +456,17 @@ asyncchecksuite "Task Handler": blockDiscovery, pendingBlocks) + advertiser = Advertiser.new( + localStore, + blockDiscovery + ) + engine = BlockExcEngine.new( localStore, wallet, network, discovery, + advertiser, peerStore, pendingBlocks) peersCtx = @[] diff --git a/tests/codex/blockexchange/testengine.nim b/tests/codex/blockexchange/testengine.nim index 5277e027..9cd968ee 100644 --- a/tests/codex/blockexchange/testengine.nim +++ b/tests/codex/blockexchange/testengine.nim @@ -1,5 +1,6 @@ import ./engine/testengine import ./engine/testblockexc import ./engine/testpayments +import ./engine/testadvertiser {.warning[UnusedImport]: off.} diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index d8798b3d..8c624bb2 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -40,8 +40,9 @@ proc generateNodes*( localStore = CacheStore.new(blocks.mapIt( it )) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() + advertiser = Advertiser.new(localStore, discovery) blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks) - engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) + engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks) networkStore = NetworkStore.new(engine, localStore) switch.mount(network) diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 498ea45b..b3f89e5c 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -82,6 +82,7 @@ template setupAndTearDown*() {.dirty.} = peerStore: PeerCtxStore pendingBlocks: PendingBlocksManager discovery: DiscoveryEngine + advertiser: Advertiser taskpool: Taskpool let @@ -109,7 +110,8 @@ template setupAndTearDown*() {.dirty.} = peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) - engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) + advertiser = Advertiser.new(localStore, blockDiscovery) + engine = BlockExcEngine.new(localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks) store = NetworkStore.new(engine, localStore) taskpool = Taskpool.new(num_threads = countProcessors()) node = CodexNodeRef.new( @@ -120,8 +122,6 @@ template setupAndTearDown*() {.dirty.} = discovery = blockDiscovery, taskpool = taskpool) - await node.start() - teardown: close(file) await node.stop() diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 3aa8a424..ab8317ec 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -49,6 +49,9 @@ privateAccess(CodexNodeRef) # enable access to private fields asyncchecksuite "Test Node - Basic": setupAndTearDown() + setup: + await node.start() + test "Fetch Manifest": let manifest = await storeDataGetManifest(localStore, chunker) diff --git a/tests/codex/stores/commonstoretests.nim b/tests/codex/stores/commonstoretests.nim index 863b59d4..7d6cc89a 100644 --- a/tests/codex/stores/commonstoretests.nim +++ b/tests/codex/stores/commonstoretests.nim @@ -15,6 +15,7 @@ import pkg/codex/utils import ../../asynctest import ../helpers +import ../examples type StoreProvider* = proc(): BlockStore {.gcsafe.} @@ -56,6 +57,16 @@ proc commonBlockStoreTests*(name: string, (await store.putBlock(newBlock1)).tryGet() check (await store.hasBlock(newBlock1.cid)).tryGet() + test "putBlock raises onBlockStored": + var storedCid = Cid.example + proc onStored(cid: Cid) {.async.} = + storedCid = cid + store.onBlockStored = onStored.some() + + (await store.putBlock(newBlock1)).tryGet() + + check storedCid == newBlock1.cid + test "getBlock": (await store.putBlock(newBlock)).tryGet() let blk = await store.getBlock(newBlock.cid)