Merge branch 'master' into gcc-14-update

This commit is contained in:
benbierens 2024-08-28 09:28:55 +02:00
commit 2cf56bf13b
No known key found for this signature in database
GPG Key ID: 877D2C2E09A22F3A
27 changed files with 458 additions and 500 deletions

View File

@ -1,201 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2024 Codex Storage
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,4 +1,4 @@
Apache License Apache License
Version 2.0, January 2004 Version 2.0, January 2004
http://www.apache.org/licenses/ http://www.apache.org/licenses/

View File

@ -1,5 +1,6 @@
import ./engine/discovery import ./engine/discovery
import ./engine/advertiser
import ./engine/engine import ./engine/engine
import ./engine/payments import ./engine/payments
export discovery, engine, payments export discovery, advertiser, engine, payments

View File

@ -0,0 +1,177 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/libp2p/cid
import pkg/libp2p/multicodec
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import ../protobuf/presence
import ../peers
import ../../utils
import ../../discovery
import ../../stores/blockstore
import ../../logutils
import ../../manifest
logScope:
topics = "codex discoveryengine advertiser"
declareGauge(codexInflightAdvertise, "inflight advertise requests")
const
DefaultConcurrentAdvertRequests = 10
DefaultAdvertiseLoopSleep = 30.minutes
type
Advertiser* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
discovery*: Discovery # Discovery interface
advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} =
if cid notin b.advertiseQueue:
await b.advertiseQueue.put(cid)
trace "Advertising", cid
proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)
proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
while b.advertiserRunning:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
await sleepAsync(b.advertiseLocalStoreLoopSleep)
info "Exiting advertise task loop"
proc processQueueLoop(b: Advertiser) {.async.} =
while b.advertiserRunning:
try:
let
cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
continue
try:
let
request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64)
await request
finally:
b.inFlightAdvReqs.del(cid)
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg
info "Exiting advertise task runner"
proc start*(b: Advertiser) {.async.} =
## Start the advertiser
##
trace "Advertiser start"
proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)
doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some
if b.advertiserRunning:
warn "Starting advertiser twice"
return
b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
##
trace "Advertiser stop"
if not b.advertiserRunning:
warn "Stopping advertiser without starting it"
return
b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"
# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"
trace "Advertiser stopped"
proc new*(
T: type Advertiser,
localStore: BlockStore,
discovery: Discovery,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
advertiseLocalStoreLoopSleep = DefaultAdvertiseLoopSleep
): Advertiser =
## Create a advertiser instance
##
Advertiser(
localStore: localStore,
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)

View File

@ -35,11 +35,9 @@ declareGauge(codexInflightDiscovery, "inflight discovery requests")
const const
DefaultConcurrentDiscRequests = 10 DefaultConcurrentDiscRequests = 10
DefaultConcurrentAdvertRequests = 10
DefaultDiscoveryTimeout = 1.minutes DefaultDiscoveryTimeout = 1.minutes
DefaultMinPeersPerBlock = 3 DefaultMinPeersPerBlock = 3
DefaultDiscoveryLoopSleep = 3.seconds DefaultDiscoveryLoopSleep = 3.seconds
DefaultAdvertiseLoopSleep = 30.minutes
type type
DiscoveryEngine* = ref object of RootObj DiscoveryEngine* = ref object of RootObj
@ -49,20 +47,13 @@ type
discovery*: Discovery # Discovery interface discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running discEngineRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
concurrentDiscReqs: int # Concurrent discovery requests concurrentDiscReqs: int # Concurrent discovery requests
advertiseLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
discoveryLoop*: Future[void] # Discovery loop task handle discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks discoveryTasks*: seq[Future[void]] # Discovery tasks
minPeersPerBlock*: int # Max number of peers with block minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep discoveryLoopSleep: Duration # Discovery loop sleep
advertiseLoopSleep: Duration # Advertise loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
advertiseType*: BlockType # Advertice blocks, manifests or both
proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning: while b.discEngineRunning:
@ -81,68 +72,6 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
await sleepAsync(b.discoveryLoopSleep) await sleepAsync(b.discoveryLoopSleep)
proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} =
without isM =? cid.isManifest, err:
warn "Unable to determine if cid is manifest"
return
if isM:
without blk =? await b.localStore.getBlock(cid), err:
error "Error retrieving manifest block", cid, err = err.msg
return
without manifest =? Manifest.decode(blk), err:
error "Unable to decode as manifest", err = err.msg
return
# announce manifest cid and tree cid
await b.advertiseQueue.put(cid)
await b.advertiseQueue.put(manifest.treeCid)
proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} =
while b.discEngineRunning:
if cids =? await b.localStore.listBlocks(blockType = b.advertiseType):
trace "Begin iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Iterating blocks finished."
await sleepAsync(b.advertiseLoopSleep)
info "Exiting advertise task loop"
proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
## Run advertise tasks
##
while b.discEngineRunning:
try:
let
cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
continue
try:
let
request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
await request
finally:
b.inFlightAdvReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64)
except CancelledError:
trace "Advertise task cancelled"
return
except CatchableError as exc:
warn "Exception in advertise task runner", exc = exc.msg
info "Exiting advertise task runner"
proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
## Run discovery tasks ## Run discovery tasks
## ##
@ -167,7 +96,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
.wait(DefaultDiscoveryTimeout) .wait(DefaultDiscoveryTimeout)
b.inFlightDiscReqs[cid] = request b.inFlightDiscReqs[cid] = request
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64)
let let
peers = await request peers = await request
@ -181,7 +110,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
finally: finally:
b.inFlightDiscReqs.del(cid) b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64)
except CancelledError: except CancelledError:
trace "Discovery task cancelled" trace "Discovery task cancelled"
return return
@ -198,14 +127,6 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
except CatchableError as exc: except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg warn "Exception queueing discovery request", exc = exc.msg
proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
if cid notin b.advertiseQueue:
try:
b.advertiseQueue.putNoWait(cid)
except CatchableError as exc:
warn "Exception queueing discovery request", exc = exc.msg
proc start*(b: DiscoveryEngine) {.async.} = proc start*(b: DiscoveryEngine) {.async.} =
## Start the discengine task ## Start the discengine task
## ##
@ -217,13 +138,9 @@ proc start*(b: DiscoveryEngine) {.async.} =
return return
b.discEngineRunning = true b.discEngineRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(advertiseTaskLoop(b))
for i in 0..<b.concurrentDiscReqs: for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b)) b.discoveryTasks.add(discoveryTaskLoop(b))
b.advertiseLoop = advertiseQueueLoop(b)
b.discoveryLoop = discoveryQueueLoop(b) b.discoveryLoop = discoveryQueueLoop(b)
proc stop*(b: DiscoveryEngine) {.async.} = proc stop*(b: DiscoveryEngine) {.async.} =
@ -236,23 +153,12 @@ proc stop*(b: DiscoveryEngine) {.async.} =
return return
b.discEngineRunning = false b.discEngineRunning = false
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"
for task in b.discoveryTasks: for task in b.discoveryTasks:
if not task.finished: if not task.finished:
trace "Awaiting discovery task to stop" trace "Awaiting discovery task to stop"
await task.cancelAndWait() await task.cancelAndWait()
trace "Discovery task stopped" trace "Discovery task stopped"
if not b.advertiseLoop.isNil and not b.advertiseLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLoop.cancelAndWait()
trace "Advertise loop stopped"
if not b.discoveryLoop.isNil and not b.discoveryLoop.finished: if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop" trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait() await b.discoveryLoop.cancelAndWait()
@ -267,12 +173,9 @@ proc new*(
network: BlockExcNetwork, network: BlockExcNetwork,
discovery: Discovery, discovery: Discovery,
pendingBlocks: PendingBlocksManager, pendingBlocks: PendingBlocksManager,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
concurrentDiscReqs = DefaultConcurrentDiscRequests, concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep, discoveryLoopSleep = DefaultDiscoveryLoopSleep,
advertiseLoopSleep = DefaultAdvertiseLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock
minPeersPerBlock = DefaultMinPeersPerBlock,
advertiseType = BlockType.Manifest
): DiscoveryEngine = ): DiscoveryEngine =
## Create a discovery engine instance for advertising services ## Create a discovery engine instance for advertising services
## ##
@ -282,13 +185,8 @@ proc new*(
network: network, network: network,
discovery: discovery, discovery: discovery,
pendingBlocks: pendingBlocks, pendingBlocks: pendingBlocks,
concurrentAdvReqs: concurrentAdvReqs,
concurrentDiscReqs: concurrentDiscReqs, concurrentDiscReqs: concurrentDiscReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs), discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](), inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
inFlightAdvReqs: initTable[Cid, Future[void]](),
discoveryLoopSleep: discoveryLoopSleep, discoveryLoopSleep: discoveryLoopSleep,
advertiseLoopSleep: advertiseLoopSleep, minPeersPerBlock: minPeersPerBlock)
minPeersPerBlock: minPeersPerBlock,
advertiseType: advertiseType)

View File

@ -34,6 +34,7 @@ import ../peers
import ./payments import ./payments
import ./discovery import ./discovery
import ./advertiser
import ./pendingblocks import ./pendingblocks
export peers, pendingblocks, payments, discovery export peers, pendingblocks, payments, discovery
@ -77,6 +78,7 @@ type
pricing*: ?Pricing # Optional bandwidth pricing pricing*: ?Pricing # Optional bandwidth pricing
blockFetchTimeout*: Duration # Timeout for fetching blocks over the network blockFetchTimeout*: Duration # Timeout for fetching blocks over the network
discovery*: DiscoveryEngine discovery*: DiscoveryEngine
advertiser*: Advertiser
Pricing* = object Pricing* = object
address*: EthAddress address*: EthAddress
@ -93,6 +95,7 @@ proc start*(b: BlockExcEngine) {.async.} =
## ##
await b.discovery.start() await b.discovery.start()
await b.advertiser.start()
trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks
if b.blockexcRunning: if b.blockexcRunning:
@ -108,6 +111,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
## ##
await b.discovery.stop() await b.discovery.stop()
await b.advertiser.stop()
trace "NetworkStore stop" trace "NetworkStore stop"
if not b.blockexcRunning: if not b.blockexcRunning:
@ -284,27 +288,11 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
if failed.len > 0: if failed.len > 0:
warn "Failed to send block request cancellations to peers", peers = failed.len warn "Failed to send block request cancellations to peers", peers = failed.len
proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] =
var cids = initHashSet[Cid]()
for bd in blocksDelivery:
if bd.address.leaf:
cids.incl(bd.address.treeCid)
else:
without isM =? bd.address.cid.isManifest, err:
warn "Unable to determine if cid is manifest"
continue
if isM:
cids.incl(bd.address.cid)
return cids.toSeq
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
b.pendingBlocks.resolve(blocksDelivery) b.pendingBlocks.resolve(blocksDelivery)
await b.scheduleTasks(blocksDelivery) await b.scheduleTasks(blocksDelivery)
let announceCids = getAnnouceCids(blocksDelivery)
await b.cancelBlocks(blocksDelivery.mapIt(it.address)) await b.cancelBlocks(blocksDelivery.mapIt(it.address))
b.discovery.queueProvideBlocksReq(announceCids)
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
await b.resolveBlocks( await b.resolveBlocks(
blocks.mapIt( blocks.mapIt(
@ -596,6 +584,7 @@ proc new*(
wallet: WalletRef, wallet: WalletRef,
network: BlockExcNetwork, network: BlockExcNetwork,
discovery: DiscoveryEngine, discovery: DiscoveryEngine,
advertiser: Advertiser,
peerStore: PeerCtxStore, peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager, pendingBlocks: PendingBlocksManager,
concurrentTasks = DefaultConcurrentTasks, concurrentTasks = DefaultConcurrentTasks,
@ -616,6 +605,7 @@ proc new*(
concurrentTasks: concurrentTasks, concurrentTasks: concurrentTasks,
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery, discovery: discovery,
advertiser: advertiser,
blockFetchTimeout: blockFetchTimeout) blockFetchTimeout: blockFetchTimeout)
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =

View File

@ -269,8 +269,9 @@ proc new*(
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
advertiser = Advertiser.new(repoStore, discovery)
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore) store = NetworkStore.new(engine, repoStore)
prover = if config.prover: prover = if config.prover:
if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and

View File

@ -347,9 +347,11 @@ method subscribeProofSubmission*(market: OnChainMarket,
method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
await subscription.eventSubscription.unsubscribe() await subscription.eventSubscription.unsubscribe()
method queryPastStorageRequests*(market: OnChainMarket, method queryPastEvents*[T: MarketplaceEvent](
blocksAgo: int): market: OnChainMarket,
Future[seq[PastStorageRequest]] {.async.} = _: type T,
blocksAgo: int): Future[seq[T]] {.async.} =
convertEthersError: convertEthersError:
let contract = market.contract let contract = market.contract
let provider = contract.provider let provider = contract.provider
@ -357,13 +359,6 @@ method queryPastStorageRequests*(market: OnChainMarket,
let head = await provider.getBlockNumber() let head = await provider.getBlockNumber()
let fromBlock = BlockTag.init(head - blocksAgo.abs.u256) let fromBlock = BlockTag.init(head - blocksAgo.abs.u256)
let events = await contract.queryFilter(StorageRequested, return await contract.queryFilter(T,
fromBlock, fromBlock,
BlockTag.latest) BlockTag.latest)
return events.map(event =>
PastStorageRequest(
requestId: event.requestId,
ask: event.ask,
expiry: event.expiry
)
)

View File

@ -16,25 +16,6 @@ export requests
type type
Marketplace* = ref object of Contract Marketplace* = ref object of Contract
StorageRequested* = object of Event
requestId*: RequestId
ask*: StorageAsk
expiry*: UInt256
SlotFilled* = object of Event
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
SlotFreed* = object of Event
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
RequestFulfilled* = object of Event
requestId* {.indexed.}: RequestId
RequestCancelled* = object of Event
requestId* {.indexed.}: RequestId
RequestFailed* = object of Event
requestId* {.indexed.}: RequestId
ProofSubmitted* = object of Event
id*: SlotId
proc config*(marketplace: Marketplace): MarketplaceConfig {.contract, view.} proc config*(marketplace: Marketplace): MarketplaceConfig {.contract, view.}
proc token*(marketplace: Marketplace): Address {.contract, view.} proc token*(marketplace: Marketplace): Address {.contract, view.}

View File

@ -28,11 +28,28 @@ type
OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].}
OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises:[].} OnProofSubmitted* = proc(id: SlotId) {.gcsafe, upraises:[].}
PastStorageRequest* = object ProofChallenge* = array[32, byte]
# Marketplace events -- located here due to the Market abstraction
MarketplaceEvent* = Event
StorageRequested* = object of MarketplaceEvent
requestId*: RequestId requestId*: RequestId
ask*: StorageAsk ask*: StorageAsk
expiry*: UInt256 expiry*: UInt256
ProofChallenge* = array[32, byte] SlotFilled* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
SlotFreed* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
slotIndex*: UInt256
RequestFulfilled* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
RequestCancelled* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
RequestFailed* = object of MarketplaceEvent
requestId* {.indexed.}: RequestId
ProofSubmitted* = object of MarketplaceEvent
id*: SlotId
method getZkeyHash*(market: Market): Future[?string] {.base, async.} = method getZkeyHash*(market: Market): Future[?string] {.base, async.} =
raiseAssert("not implemented") raiseAssert("not implemented")
@ -202,7 +219,8 @@ method subscribeProofSubmission*(market: Market,
method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} =
raiseAssert("not implemented") raiseAssert("not implemented")
method queryPastStorageRequests*(market: Market, method queryPastEvents*[T: MarketplaceEvent](
blocksAgo: int): market: Market,
Future[seq[PastStorageRequest]] {.base, async.} = _: type T,
blocksAgo: int): Future[seq[T]] {.base, async.} =
raiseAssert("not implemented") raiseAssert("not implemented")

View File

@ -366,9 +366,6 @@ proc store*(
blocks = manifest.blocksCount, blocks = manifest.blocksCount,
datasetSize = manifest.datasetSize datasetSize = manifest.datasetSize
await self.discovery.provide(manifestBlk.cid)
await self.discovery.provide(treeCid)
return manifestBlk.cid.success return manifestBlk.cid.success
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =

View File

@ -418,6 +418,8 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) = proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
let allowedOrigin = router.allowedOrigin
router.rawApi( router.rawApi(
MethodPost, MethodPost,
"/api/codex/v1/storage/request/{cid}") do (cid: Cid) -> RestApiResponse: "/api/codex/v1/storage/request/{cid}") do (cid: Cid) -> RestApiResponse:
@ -432,37 +434,44 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
## tolerance - allowed number of nodes that can be lost before content is lost ## tolerance - allowed number of nodes that can be lost before content is lost
## colateral - requested collateral from hosts when they fill slot ## colateral - requested collateral from hosts when they fill slot
var headers = newSeq[(string,string)]()
if corsOrigin =? allowedOrigin:
headers.add(("Access-Control-Allow-Origin", corsOrigin))
headers.add(("Access-Control-Allow-Methods", "POST, OPTIONS"))
headers.add(("Access-Control-Max-Age", "86400"))
try: try:
without contracts =? node.contracts.client: without contracts =? node.contracts.client:
return RestApiResponse.error(Http503, "Purchasing unavailable") return RestApiResponse.error(Http503, "Purchasing unavailable", headers = headers)
without cid =? cid.tryGet.catch, error: without cid =? cid.tryGet.catch, error:
return RestApiResponse.error(Http400, error.msg) return RestApiResponse.error(Http400, error.msg, headers = headers)
let body = await request.getBody() let body = await request.getBody()
without params =? StorageRequestParams.fromJson(body), error: without params =? StorageRequestParams.fromJson(body), error:
return RestApiResponse.error(Http400, error.msg) return RestApiResponse.error(Http400, error.msg, headers = headers)
let nodes = params.nodes |? 1 let nodes = params.nodes |? 1
let tolerance = params.tolerance |? 0 let tolerance = params.tolerance |? 0
# prevent underflow # prevent underflow
if tolerance > nodes: if tolerance > nodes:
return RestApiResponse.error(Http400, "Invalid parameters: `tolerance` cannot be greater than `nodes`") return RestApiResponse.error(Http400, "Invalid parameters: `tolerance` cannot be greater than `nodes`", headers = headers)
let ecK = nodes - tolerance let ecK = nodes - tolerance
let ecM = tolerance # for readability let ecM = tolerance # for readability
# ensure leopard constrainst of 1 < K ≥ M # ensure leopard constrainst of 1 < K ≥ M
if ecK <= 1 or ecK < ecM: if ecK <= 1 or ecK < ecM:
return RestApiResponse.error(Http400, "Invalid parameters: parameters must satify `1 < (nodes - tolerance) ≥ tolerance`") return RestApiResponse.error(Http400, "Invalid parameters: parameters must satify `1 < (nodes - tolerance) ≥ tolerance`", headers = headers)
without expiry =? params.expiry: without expiry =? params.expiry:
return RestApiResponse.error(Http400, "Expiry required") return RestApiResponse.error(Http400, "Expiry required", headers = headers)
if expiry <= 0 or expiry >= params.duration: if expiry <= 0 or expiry >= params.duration:
return RestApiResponse.error(Http400, "Expiry needs value bigger then zero and smaller then the request's duration") return RestApiResponse.error(Http400, "Expiry needs value bigger then zero and smaller then the request's duration", headers = headers)
without purchaseId =? await node.requestStorage( without purchaseId =? await node.requestStorage(
cid, cid,
@ -477,14 +486,14 @@ proc initPurchasingApi(node: CodexNodeRef, router: var RestRouter) =
if error of InsufficientBlocksError: if error of InsufficientBlocksError:
return RestApiResponse.error(Http400, return RestApiResponse.error(Http400,
"Dataset too small for erasure parameters, need at least " & "Dataset too small for erasure parameters, need at least " &
$(ref InsufficientBlocksError)(error).minSize.int & " bytes") $(ref InsufficientBlocksError)(error).minSize.int & " bytes", headers = headers)
return RestApiResponse.error(Http500, error.msg) return RestApiResponse.error(Http500, error.msg, headers = headers)
return RestApiResponse.response(purchaseId.toHex) return RestApiResponse.response(purchaseId.toHex)
except CatchableError as exc: except CatchableError as exc:
trace "Excepting processing request", exc = exc.msg trace "Excepting processing request", exc = exc.msg
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500, headers = headers)
router.api( router.api(
MethodGet, MethodGet,

View File

@ -29,7 +29,9 @@ type
BlockType* {.pure.} = enum BlockType* {.pure.} = enum
Manifest, Block, Both Manifest, Block, Both
CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, raises:[].}
BlockStore* = ref object of RootObj BlockStore* = ref object of RootObj
onBlockStored*: ?CidCallback
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
## Get a block from the blockstore ## Get a block from the blockstore

View File

@ -197,6 +197,9 @@ method putBlock*(
return success() return success()
discard self.putBlockSync(blk) discard self.putBlockSync(blk)
if onBlock =? self.onBlockStored:
await onBlock(blk.cid)
return success() return success()
method putCidAndProof*( method putCidAndProof*(
@ -282,7 +285,8 @@ proc new*(
cache: cache, cache: cache,
cidAndProofCache: cidAndProofCache, cidAndProofCache: cidAndProofCache,
currentSize: currentSize, currentSize: currentSize,
size: cacheSize) size: cacheSize,
onBlockStored: CidCallback.none)
for blk in blocks: for blk in blocks:
discard store.putBlockSync(blk) discard store.putBlockSync(blk)

View File

@ -189,6 +189,9 @@ method putBlock*(
if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
return failure(err) return failure(err)
if onBlock =? self.onBlockStored:
await onBlock(blk.cid)
else: else:
trace "Block already exists" trace "Block already exists"

View File

@ -11,6 +11,7 @@ import pkg/chronos
import pkg/datastore import pkg/datastore
import pkg/datastore/typedds import pkg/datastore/typedds
import pkg/libp2p/cid import pkg/libp2p/cid
import pkg/questionable
import ../blockstore import ../blockstore
import ../../clock import ../../clock
@ -103,5 +104,6 @@ func new*(
clock: clock, clock: clock,
postFixLen: postFixLen, postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes, quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl blockTtl: blockTtl,
onBlockStored: CidCallback.none
) )

View File

@ -32,6 +32,7 @@ asyncchecksuite "Block Advertising and Discovery":
peerStore: PeerCtxStore peerStore: PeerCtxStore
blockDiscovery: MockDiscovery blockDiscovery: MockDiscovery
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
wallet: WalletRef wallet: WalletRef
network: BlockExcNetwork network: BlockExcNetwork
localStore: CacheStore localStore: CacheStore
@ -68,11 +69,17 @@ asyncchecksuite "Block Advertising and Discovery":
pendingBlocks, pendingBlocks,
minPeersPerBlock = 1) minPeersPerBlock = 1)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -200,11 +207,17 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
pendingBlocks, pendingBlocks,
minPeersPerBlock = 1) minPeersPerBlock = 1)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
networkStore = NetworkStore.new(engine, localStore) networkStore = NetworkStore.new(engine, localStore)

View File

@ -74,30 +74,6 @@ asyncchecksuite "Test Discovery Engine":
await allFuturesThrowing(allFinished(wants)).wait(1.seconds) await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
await discoveryEngine.stop() await discoveryEngine.stop()
test "Should Advertise Haves":
var
localStore = CacheStore.new(blocks.mapIt(it))
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis)
haves = collect(initTable):
for cid in @[manifestBlock.cid, manifest.treeCid]:
{ cid: newFuture[void]() }
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
if not haves[cid].finished:
haves[cid].complete
await discoveryEngine.start()
await allFuturesThrowing(
allFinished(toSeq(haves.values))).wait(5.seconds)
await discoveryEngine.stop()
test "Should queue discovery request": test "Should queue discovery request":
var var
localStore = CacheStore.new() localStore = CacheStore.new()
@ -191,36 +167,3 @@ asyncchecksuite "Test Discovery Engine":
reqs.complete() reqs.complete()
await discoveryEngine.stop() await discoveryEngine.stop()
test "Should not request if there is already an inflight advertise request":
var
localStore = CacheStore.new()
discoveryEngine = DiscoveryEngine.new(
localStore,
peerStore,
network,
blockDiscovery,
pendingBlocks,
discoveryLoopSleep = 100.millis,
concurrentAdvReqs = 2)
reqs = newFuture[void]()
count = 0
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid
if count > 0:
check false
count.inc
await reqs # queue the request
await discoveryEngine.start()
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
await sleepAsync(200.millis)
reqs.complete()
await discoveryEngine.stop()

View File

@ -0,0 +1,106 @@
import std/sequtils
import std/random
import pkg/chronos
import pkg/libp2p/routing_record
import pkg/codexdht/discv5/protocol as discv5
import pkg/codex/blockexchange
import pkg/codex/stores
import pkg/codex/chunker
import pkg/codex/discovery
import pkg/codex/blocktype as bt
import pkg/codex/manifest
import ../../../asynctest
import ../../helpers
import ../../helpers/mockdiscovery
import ../../examples
asyncchecksuite "Advertiser":
var
blockDiscovery: MockDiscovery
localStore: BlockStore
advertiser: Advertiser
let
manifest = Manifest.new(
treeCid = Cid.example,
blockSize = 123.NBytes,
datasetSize = 234.NBytes)
manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
setup:
blockDiscovery = MockDiscovery.new()
localStore = CacheStore.new()
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
await advertiser.start()
teardown:
await advertiser.stop()
test "blockStored should queue manifest Cid for advertising":
(await localStore.putBlock(manifestBlk)).tryGet()
check:
manifestBlk.cid in advertiser.advertiseQueue
test "blockStored should queue tree Cid for advertising":
(await localStore.putBlock(manifestBlk)).tryGet()
check:
manifest.treeCid in advertiser.advertiseQueue
test "blockStored should not queue non-manifest non-tree CIDs for discovery":
let blk = bt.Block.example
(await localStore.putBlock(blk)).tryGet()
check:
blk.cid notin advertiser.advertiseQueue
test "Should not queue if there is already an inflight advertise request":
var
reqs = newFuture[void]()
manifestCount = 0
treeCount = 0
blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
if cid == manifestBlk.cid:
inc manifestCount
if cid == manifest.treeCid:
inc treeCount
await reqs # queue the request
(await localStore.putBlock(manifestBlk)).tryGet()
(await localStore.putBlock(manifestBlk)).tryGet()
reqs.complete()
check eventually manifestCount == 1
check eventually treeCount == 1
test "Should advertise existing manifests and their trees":
let
newStore = CacheStore.new([manifestBlk])
await advertiser.stop()
advertiser = Advertiser.new(
newStore,
blockDiscovery
)
await advertiser.start()
check eventually manifestBlk.cid in advertiser.advertiseQueue
check eventually manifest.treeCid in advertiser.advertiseQueue
test "Stop should clear onBlockStored callback":
await advertiser.stop()
check:
localStore.onBlockStored.isNone()

View File

@ -78,11 +78,17 @@ asyncchecksuite "NetworkStore engine basic":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -113,11 +119,17 @@ asyncchecksuite "NetworkStore engine basic":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -139,6 +151,7 @@ asyncchecksuite "NetworkStore engine handlers":
network: BlockExcNetwork network: BlockExcNetwork
engine: BlockExcEngine engine: BlockExcEngine
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
peerCtx: BlockExcPeerCtx peerCtx: BlockExcPeerCtx
localStore: BlockStore localStore: BlockStore
blocks: seq[Block] blocks: seq[Block]
@ -176,11 +189,17 @@ asyncchecksuite "NetworkStore engine handlers":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
@ -390,51 +409,6 @@ asyncchecksuite "NetworkStore engine handlers":
discard await allFinished(pending) discard await allFinished(pending)
await allFuturesThrowing(cancellations.values().toSeq) await allFuturesThrowing(cancellations.values().toSeq)
test "resolveBlocks should queue manifest CIDs for discovery":
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
let
manifest = Manifest.new(
treeCid = Cid.example,
blockSize = 123.NBytes,
datasetSize = 234.NBytes
)
let manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
let blks = @[manifestBlk]
await engine.resolveBlocks(blks)
check:
manifestBlk.cid in engine.discovery.advertiseQueue
test "resolveBlocks should queue tree CIDs for discovery":
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
let
tCid = Cid.example
delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: true, treeCid: tCid))
await engine.resolveBlocks(@[delivery])
check:
tCid in engine.discovery.advertiseQueue
test "resolveBlocks should not queue non-manifest non-tree CIDs for discovery":
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
let
blkCid = Cid.example
delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: false, cid: blkCid))
await engine.resolveBlocks(@[delivery])
check:
blkCid notin engine.discovery.advertiseQueue
asyncchecksuite "Task Handler": asyncchecksuite "Task Handler":
var var
rng: Rng rng: Rng
@ -448,6 +422,7 @@ asyncchecksuite "Task Handler":
network: BlockExcNetwork network: BlockExcNetwork
engine: BlockExcEngine engine: BlockExcEngine
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
localStore: BlockStore localStore: BlockStore
peersCtx: seq[BlockExcPeerCtx] peersCtx: seq[BlockExcPeerCtx]
@ -481,11 +456,17 @@ asyncchecksuite "Task Handler":
blockDiscovery, blockDiscovery,
pendingBlocks) pendingBlocks)
advertiser = Advertiser.new(
localStore,
blockDiscovery
)
engine = BlockExcEngine.new( engine = BlockExcEngine.new(
localStore, localStore,
wallet, wallet,
network, network,
discovery, discovery,
advertiser,
peerStore, peerStore,
pendingBlocks) pendingBlocks)
peersCtx = @[] peersCtx = @[]

View File

@ -1,5 +1,6 @@
import ./engine/testengine import ./engine/testengine
import ./engine/testblockexc import ./engine/testblockexc
import ./engine/testpayments import ./engine/testpayments
import ./engine/testadvertiser
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}

View File

@ -420,16 +420,21 @@ method subscribeProofSubmission*(mock: MockMarket,
mock.subscriptions.onProofSubmitted.add(subscription) mock.subscriptions.onProofSubmitted.add(subscription)
return subscription return subscription
method queryPastStorageRequests*(market: MockMarket, method queryPastEvents*[T: MarketplaceEvent](
blocksAgo: int): market: MockMarket,
Future[seq[PastStorageRequest]] {.async.} = _: type T,
# MockMarket does not have the concept of blocks, so simply return all blocksAgo: int): Future[seq[T]] {.async.} =
# previous events
return market.requested.map(request => if T of StorageRequested:
PastStorageRequest(requestId: request.id, return market.requested.map(request =>
StorageRequested(requestId: request.id,
ask: request.ask, ask: request.ask,
expiry: request.expiry) expiry: request.expiry)
) )
elif T of SlotFilled:
return market.filled.map(slot =>
SlotFilled(requestId: slot.requestId, slotIndex: slot.slotIndex)
)
method unsubscribe*(subscription: RequestSubscription) {.async.} = method unsubscribe*(subscription: RequestSubscription) {.async.} =
subscription.market.subscriptions.onRequest.keepItIf(it != subscription) subscription.market.subscriptions.onRequest.keepItIf(it != subscription)

View File

@ -40,8 +40,9 @@ proc generateNodes*(
localStore = CacheStore.new(blocks.mapIt( it )) localStore = CacheStore.new(blocks.mapIt( it ))
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
advertiser = Advertiser.new(localStore, discovery)
blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks) blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, advertiser, peerStore, pendingBlocks)
networkStore = NetworkStore.new(engine, localStore) networkStore = NetworkStore.new(engine, localStore)
switch.mount(network) switch.mount(network)

View File

@ -82,6 +82,7 @@ template setupAndTearDown*() {.dirty.} =
peerStore: PeerCtxStore peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser
taskpool: Taskpool taskpool: Taskpool
let let
@ -109,7 +110,8 @@ template setupAndTearDown*() {.dirty.} =
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks) advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore) store = NetworkStore.new(engine, localStore)
taskpool = Taskpool.new(num_threads = countProcessors()) taskpool = Taskpool.new(num_threads = countProcessors())
node = CodexNodeRef.new( node = CodexNodeRef.new(
@ -120,8 +122,6 @@ template setupAndTearDown*() {.dirty.} =
discovery = blockDiscovery, discovery = blockDiscovery,
taskpool = taskpool) taskpool = taskpool)
await node.start()
teardown: teardown:
close(file) close(file)
await node.stop() await node.stop()

View File

@ -49,6 +49,9 @@ privateAccess(CodexNodeRef) # enable access to private fields
asyncchecksuite "Test Node - Basic": asyncchecksuite "Test Node - Basic":
setupAndTearDown() setupAndTearDown()
setup:
await node.start()
test "Fetch Manifest": test "Fetch Manifest":
let let
manifest = await storeDataGetManifest(localStore, chunker) manifest = await storeDataGetManifest(localStore, chunker)

View File

@ -15,6 +15,7 @@ import pkg/codex/utils
import ../../asynctest import ../../asynctest
import ../helpers import ../helpers
import ../examples
type type
StoreProvider* = proc(): BlockStore {.gcsafe.} StoreProvider* = proc(): BlockStore {.gcsafe.}
@ -56,6 +57,16 @@ proc commonBlockStoreTests*(name: string,
(await store.putBlock(newBlock1)).tryGet() (await store.putBlock(newBlock1)).tryGet()
check (await store.hasBlock(newBlock1.cid)).tryGet() check (await store.hasBlock(newBlock1.cid)).tryGet()
test "putBlock raises onBlockStored":
var storedCid = Cid.example
proc onStored(cid: Cid) {.async.} =
storedCid = cid
store.onBlockStored = onStored.some()
(await store.putBlock(newBlock1)).tryGet()
check storedCid == newBlock1.cid
test "getBlock": test "getBlock":
(await store.putBlock(newBlock)).tryGet() (await store.putBlock(newBlock)).tryGet()
let blk = await store.getBlock(newBlock.cid) let blk = await store.getBlock(newBlock.cid)

View File

@ -324,7 +324,7 @@ ethersuite "On-Chain Market":
let slotId = request.slotId(slotIndex) let slotId = request.slotId(slotIndex)
check (await market.slotState(slotId)) == SlotState.Filled check (await market.slotState(slotId)) == SlotState.Filled
test "can query past events": test "can query past StorageRequested events":
var request1 = StorageRequest.example var request1 = StorageRequest.example
var request2 = StorageRequest.example var request2 = StorageRequest.example
request1.client = accounts[0] request1.client = accounts[0]
@ -335,21 +335,38 @@ ethersuite "On-Chain Market":
# `market.requestStorage` executes an `approve` tx before the # `market.requestStorage` executes an `approve` tx before the
# `requestStorage` tx, so that's two PoA blocks per `requestStorage` call (6 # `requestStorage` tx, so that's two PoA blocks per `requestStorage` call (6
# blocks for 3 calls). `fromBlock` and `toBlock` are inclusive, so to check # blocks for 3 calls). We don't need to check the `approve` for the first
# 6 blocks, we only need to check 5 "blocks ago". We don't need to check the # `requestStorage` call, so we only need to check 5 "blocks ago". "blocks
# `approve` for the first `requestStorage` call, so that's 1 less again = 4 # ago".
# "blocks ago".
proc getsPastRequest(): Future[bool] {.async.} = proc getsPastRequest(): Future[bool] {.async.} =
let reqs = await market.queryPastStorageRequests(5) let reqs = await market.queryPastEvents(StorageRequested, 5)
reqs.mapIt(it.requestId) == @[request.id, request1.id, request2.id] reqs.mapIt(it.requestId) == @[request.id, request1.id, request2.id]
check eventually await getsPastRequest() check eventually await getsPastRequest()
test "can query past SlotFilled events":
await market.requestStorage(request)
await market.fillSlot(request.id, 0.u256, proof, request.ask.collateral)
await market.fillSlot(request.id, 1.u256, proof, request.ask.collateral)
await market.fillSlot(request.id, 2.u256, proof, request.ask.collateral)
let slotId = request.slotId(slotIndex)
# `market.fill` executes an `approve` tx before the `fillSlot` tx, so that's
# two PoA blocks per `fillSlot` call (6 blocks for 3 calls). We don't need
# to check the `approve` for the first `fillSlot` call, so we only need to
# check 5 "blocks ago".
let events = await market.queryPastEvents(SlotFilled, 5)
check events == @[
SlotFilled(requestId: request.id, slotIndex: 0.u256),
SlotFilled(requestId: request.id, slotIndex: 1.u256),
SlotFilled(requestId: request.id, slotIndex: 2.u256),
]
test "past event query can specify negative `blocksAgo` parameter": test "past event query can specify negative `blocksAgo` parameter":
await market.requestStorage(request) await market.requestStorage(request)
check eventually ( check eventually (
(await market.queryPastStorageRequests(blocksAgo = -2)) == (await market.queryPastEvents(StorageRequested, blocksAgo = -2)) ==
(await market.queryPastStorageRequests(blocksAgo = 2)) (await market.queryPastEvents(StorageRequested, blocksAgo = 2))
) )