mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-03 22:13:12 +00:00
Merge branch 'master' into feat/persistent-availabilities
This commit is contained in:
commit
f9e5342637
33
.github/actions/nimbus-build-system/action.yml
vendored
33
.github/actions/nimbus-build-system/action.yml
vendored
@ -97,6 +97,33 @@ runs:
|
||||
# Set GCC-14 as the default
|
||||
sudo update-alternatives --set gcc /usr/bin/gcc-14
|
||||
|
||||
- name: Install ccache on Linux/Mac
|
||||
if: inputs.os == 'linux' || inputs.os == 'macos'
|
||||
uses: hendrikmuhs/ccache-action@v1.2
|
||||
with:
|
||||
create-symlink: true
|
||||
key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }}
|
||||
evict-old-files: 7d
|
||||
|
||||
- name: Install ccache on Windows
|
||||
if: inputs.os == 'windows'
|
||||
uses: hendrikmuhs/ccache-action@v1.2
|
||||
with:
|
||||
key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }}
|
||||
evict-old-files: 7d
|
||||
|
||||
- name: Enable ccache on Windows
|
||||
if: inputs.os == 'windows'
|
||||
shell: ${{ inputs.shell }} {0}
|
||||
run: |
|
||||
CCACHE_DIR=$(dirname $(which ccache))/ccached
|
||||
mkdir ${CCACHE_DIR}
|
||||
ln -s $(which ccache) ${CCACHE_DIR}/gcc.exe
|
||||
ln -s $(which ccache) ${CCACHE_DIR}/g++.exe
|
||||
ln -s $(which ccache) ${CCACHE_DIR}/cc.exe
|
||||
ln -s $(which ccache) ${CCACHE_DIR}/c++.exe
|
||||
echo "export PATH=${CCACHE_DIR}:\$PATH" >> $HOME/.bash_profile # prefix path in MSYS2
|
||||
|
||||
- name: Derive environment variables
|
||||
shell: ${{ inputs.shell }} {0}
|
||||
run: |
|
||||
@ -154,8 +181,11 @@ runs:
|
||||
llvm_bin_dir="${llvm_dir}/bin"
|
||||
llvm_lib_dir="${llvm_dir}/lib"
|
||||
echo "${llvm_bin_dir}" >> ${GITHUB_PATH}
|
||||
# Make sure ccache has precedence (GITHUB_PATH is appending before)
|
||||
echo "$(brew --prefix)/opt/ccache/libexec" >> ${GITHUB_PATH}
|
||||
echo $PATH
|
||||
echo "LDFLAGS=${LDFLAGS} -L${libomp_lib_dir} -L${llvm_lib_dir} -Wl,-rpath,${llvm_lib_dir}" >> ${GITHUB_ENV}
|
||||
NIMFLAGS="${NIMFLAGS} $(quote "-d:LeopardCmakeFlags='-DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=${llvm_bin_dir}/clang -DCMAKE_CXX_COMPILER=${llvm_bin_dir}/clang++' -d:LeopardExtraCompilerlags='-fopenmp' -d:LeopardExtraLinkerFlags='-fopenmp -L${libomp_lib_dir}'")"
|
||||
NIMFLAGS="${NIMFLAGS} $(quote "-d:LeopardCmakeFlags='-DCMAKE_BUILD_TYPE=Release' -d:LeopardExtraCompilerFlags='-fopenmp' -d:LeopardExtraLinkerFlags='-fopenmp -L${libomp_lib_dir}'")"
|
||||
echo "NIMFLAGS=${NIMFLAGS}" >> $GITHUB_ENV
|
||||
fi
|
||||
|
||||
@ -191,6 +221,7 @@ runs:
|
||||
- name: Build Nim and Codex dependencies
|
||||
shell: ${{ inputs.shell }} {0}
|
||||
run: |
|
||||
which gcc
|
||||
gcc --version
|
||||
make -j${ncpu} CI_CACHE=NimBinaries ${ARCH_OVERRIDE} QUICK_AND_DIRTY_COMPILER=1 update
|
||||
echo
|
||||
|
||||
4
.github/workflows/docker-reusable.yml
vendored
4
.github/workflows/docker-reusable.yml
vendored
@ -94,11 +94,11 @@ jobs:
|
||||
- target:
|
||||
os: linux
|
||||
arch: amd64
|
||||
builder: ubuntu-22.04
|
||||
builder: ubuntu-24.04
|
||||
- target:
|
||||
os: linux
|
||||
arch: arm64
|
||||
builder: ubuntu-22.04-arm
|
||||
builder: ubuntu-24.04-arm
|
||||
|
||||
name: Build ${{ matrix.target.os }}/${{ matrix.target.arch }}
|
||||
runs-on: ${{ matrix.builder }}
|
||||
|
||||
16
.github/workflows/docs.yml
vendored
16
.github/workflows/docs.yml
vendored
@ -2,17 +2,17 @@ name: OpenAPI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- 'master'
|
||||
tags:
|
||||
- "v*.*.*"
|
||||
paths:
|
||||
- 'openapi.yaml'
|
||||
- '.github/workflows/docs.yml'
|
||||
- "openapi.yaml"
|
||||
- ".github/workflows/docs.yml"
|
||||
pull_request:
|
||||
branches:
|
||||
- '**'
|
||||
- "**"
|
||||
paths:
|
||||
- 'openapi.yaml'
|
||||
- '.github/workflows/docs.yml'
|
||||
- "openapi.yaml"
|
||||
- ".github/workflows/docs.yml"
|
||||
|
||||
# Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages
|
||||
permissions:
|
||||
@ -40,7 +40,7 @@ jobs:
|
||||
deploy:
|
||||
name: Deploy
|
||||
runs-on: ubuntu-latest
|
||||
if: github.ref == 'refs/heads/master'
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
57
build.nims
57
build.nims
@ -2,25 +2,8 @@ mode = ScriptMode.Verbose
|
||||
|
||||
import std/os except commandLineParams
|
||||
|
||||
const VendorPath = "vendor/nim-nat-traversal/vendor/libnatpmp-upstream"
|
||||
let
|
||||
oldVersionFile = joinPath(VendorPath, "VERSION")
|
||||
newVersionFile = joinPath(VendorPath, "VERSION_temp")
|
||||
|
||||
proc renameFile(oldName, newName: string) =
|
||||
if fileExists(oldName):
|
||||
mvFile(oldName, newName)
|
||||
else:
|
||||
echo "File ", oldName, " does not exist"
|
||||
|
||||
|
||||
### Helper functions
|
||||
proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
|
||||
# This is a quick workaround to avoid VERSION file conflict on macOS
|
||||
# More details here: https://github.com/codex-storage/nim-codex/issues/1059
|
||||
if defined(macosx):
|
||||
renameFile(oldVersionFile, newVersionFile)
|
||||
|
||||
if not dirExists "build":
|
||||
mkDir "build"
|
||||
|
||||
@ -30,25 +13,25 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
|
||||
for param in commandLineParams():
|
||||
extra_params &= " " & param
|
||||
else:
|
||||
for i in 2..<paramCount():
|
||||
for i in 2 ..< paramCount():
|
||||
extra_params &= " " & paramStr(i)
|
||||
|
||||
let
|
||||
# Place build output in 'build' folder, even if name includes a longer path.
|
||||
outName = os.lastPathPart(name)
|
||||
cmd = "nim " & lang & " --out:build/" & outName & " " & extra_params & " " & srcDir & name & ".nim"
|
||||
try:
|
||||
exec(cmd)
|
||||
finally:
|
||||
if defined(macosx):
|
||||
renameFile(newVersionFile, oldVersionFile)
|
||||
cmd =
|
||||
"nim " & lang & " --out:build/" & outName & " " & extra_params & " " & srcDir &
|
||||
name & ".nim"
|
||||
|
||||
exec(cmd)
|
||||
|
||||
proc test(name: string, srcDir = "tests/", params = "", lang = "c") =
|
||||
buildBinary name, srcDir, params
|
||||
exec "build/" & name
|
||||
|
||||
task codex, "build codex binary":
|
||||
buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE"
|
||||
buildBinary "codex",
|
||||
params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE"
|
||||
|
||||
task toolsCirdl, "build tools/cirdl binary":
|
||||
buildBinary "tools/cirdl/cirdl"
|
||||
@ -60,7 +43,9 @@ task testContracts, "Build & run Codex Contract tests":
|
||||
test "testContracts"
|
||||
|
||||
task testIntegration, "Run integration tests":
|
||||
buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true"
|
||||
buildBinary "codex",
|
||||
params =
|
||||
"-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true"
|
||||
test "testIntegration"
|
||||
# use params to enable logging from the integration test executable
|
||||
# test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " &
|
||||
@ -109,15 +94,25 @@ task coverage, "generates code coverage report":
|
||||
|
||||
var nimSrcs = " "
|
||||
for f in walkDirRec("codex", {pcFile}):
|
||||
if f.endswith(".nim"): nimSrcs.add " " & f.absolutePath.quoteShell()
|
||||
if f.endswith(".nim"):
|
||||
nimSrcs.add " " & f.absolutePath.quoteShell()
|
||||
|
||||
echo "======== Running Tests ======== "
|
||||
test "coverage", srcDir = "tests/", params = " --nimcache:nimcache/coverage -d:release -d:codex_enable_proof_failures=true"
|
||||
test "coverage",
|
||||
srcDir = "tests/",
|
||||
params =
|
||||
" --nimcache:nimcache/coverage -d:release -d:codex_enable_proof_failures=true"
|
||||
exec("rm nimcache/coverage/*.c")
|
||||
rmDir("coverage"); mkDir("coverage")
|
||||
rmDir("coverage")
|
||||
mkDir("coverage")
|
||||
echo " ======== Running LCOV ======== "
|
||||
exec("lcov --capture --directory nimcache/coverage --output-file coverage/coverage.info")
|
||||
exec("lcov --extract coverage/coverage.info --output-file coverage/coverage.f.info " & nimSrcs)
|
||||
exec(
|
||||
"lcov --capture --directory nimcache/coverage --output-file coverage/coverage.info"
|
||||
)
|
||||
exec(
|
||||
"lcov --extract coverage/coverage.info --output-file coverage/coverage.f.info " &
|
||||
nimSrcs
|
||||
)
|
||||
echo " ======== Generating HTML coverage report ======== "
|
||||
exec("genhtml coverage/coverage.f.info --output-directory coverage/report ")
|
||||
echo " ======== Coverage report Done ======== "
|
||||
|
||||
@ -398,7 +398,8 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy
|
||||
have = await e.address in b.localStore
|
||||
price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
|
||||
|
||||
if e.wantType == WantType.WantHave:
|
||||
case e.wantType
|
||||
of WantType.WantHave:
|
||||
if have:
|
||||
presence.add(
|
||||
BlockPresence(
|
||||
@ -415,17 +416,19 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy
|
||||
peerCtx.peerWants.add(e)
|
||||
|
||||
codex_block_exchange_want_have_lists_received.inc()
|
||||
elif e.wantType == WantType.WantBlock:
|
||||
of WantType.WantBlock:
|
||||
peerCtx.peerWants.add(e)
|
||||
schedulePeer = true
|
||||
codex_block_exchange_want_block_lists_received.inc()
|
||||
else: # Updating existing entry in peer wants
|
||||
# peer doesn't want this block anymore
|
||||
if e.cancel:
|
||||
trace "Canceling want for block", address = e.address
|
||||
peerCtx.peerWants.del(idx)
|
||||
else:
|
||||
# peer might want to ask for the same cid with
|
||||
# different want params
|
||||
trace "Updating want for block", address = e.address
|
||||
peerCtx.peerWants[idx] = e # update entry
|
||||
|
||||
if presence.len > 0:
|
||||
|
||||
@ -11,8 +11,10 @@ import std/sequtils
|
||||
import std/strutils
|
||||
import std/os
|
||||
import std/tables
|
||||
import std/cpuinfo
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/taskpools
|
||||
import pkg/presto
|
||||
import pkg/libp2p
|
||||
import pkg/confutils
|
||||
@ -107,7 +109,9 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} =
|
||||
quit QuitFailure
|
||||
|
||||
let marketplace = Marketplace.new(marketplaceAddress, signer)
|
||||
let market = OnChainMarket.new(marketplace, config.rewardRecipient)
|
||||
let market = OnChainMarket.new(
|
||||
marketplace, config.rewardRecipient, config.marketplaceRequestCacheSize
|
||||
)
|
||||
let clock = OnChainClock.new(provider)
|
||||
|
||||
var client: ?ClientInteractions
|
||||
@ -194,7 +198,18 @@ proc new*(
|
||||
.withTcpTransport({ServerFlags.ReuseAddr})
|
||||
.build()
|
||||
|
||||
var cache: CacheStore = nil
|
||||
var
|
||||
cache: CacheStore = nil
|
||||
taskpool: Taskpool
|
||||
|
||||
try:
|
||||
if config.numThreads == ThreadCount(0):
|
||||
taskpool = Taskpool.new(numThreads = min(countProcessors(), 16))
|
||||
else:
|
||||
taskpool = Taskpool.new(numThreads = int(config.numThreads))
|
||||
info "Threadpool started", numThreads = taskpool.numThreads
|
||||
except CatchableError as exc:
|
||||
raiseAssert("Failure in taskpool initialization:" & exc.msg)
|
||||
|
||||
if config.cacheSize > 0'nb:
|
||||
cache = CacheStore.new(cacheSize = config.cacheSize)
|
||||
@ -286,6 +301,7 @@ proc new*(
|
||||
engine = engine,
|
||||
discovery = discovery,
|
||||
prover = prover,
|
||||
taskPool = taskpool,
|
||||
)
|
||||
|
||||
restServer = RestServerRef
|
||||
|
||||
@ -44,6 +44,7 @@ import ./utils
|
||||
import ./nat
|
||||
import ./utils/natutils
|
||||
|
||||
from ./contracts/config import DefaultRequestCacheSize
|
||||
from ./validationconfig import MaxSlots, ValidationGroups
|
||||
|
||||
export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig
|
||||
@ -51,7 +52,11 @@ export ValidationGroups, MaxSlots
|
||||
|
||||
export
|
||||
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval,
|
||||
DefaultNumberOfBlocksToMaintainPerInterval
|
||||
DefaultNumberOfBlocksToMaintainPerInterval, DefaultRequestCacheSize
|
||||
|
||||
type ThreadCount* = distinct Natural
|
||||
|
||||
proc `==`*(a, b: ThreadCount): bool {.borrow.}
|
||||
|
||||
proc defaultDataDir*(): string =
|
||||
let dataDir =
|
||||
@ -71,6 +76,7 @@ const
|
||||
|
||||
DefaultDataDir* = defaultDataDir()
|
||||
DefaultCircuitDir* = defaultDataDir() / "circuits"
|
||||
DefaultThreadCount* = ThreadCount(0)
|
||||
|
||||
type
|
||||
StartUpCmd* {.pure.} = enum
|
||||
@ -184,6 +190,13 @@ type
|
||||
name: "max-peers"
|
||||
.}: int
|
||||
|
||||
numThreads* {.
|
||||
desc:
|
||||
"Number of worker threads (\"0\" = use as many threads as there are CPU cores available)",
|
||||
defaultValue: DefaultThreadCount,
|
||||
name: "num-threads"
|
||||
.}: ThreadCount
|
||||
|
||||
agentString* {.
|
||||
defaultValue: "Codex",
|
||||
desc: "Node agent string which is used as identifier in network",
|
||||
@ -347,6 +360,16 @@ type
|
||||
name: "reward-recipient"
|
||||
.}: Option[EthAddress]
|
||||
|
||||
marketplaceRequestCacheSize* {.
|
||||
desc:
|
||||
"Maximum number of StorageRequests kept in memory." &
|
||||
"Reduces fetching of StorageRequest data from the contract.",
|
||||
defaultValue: DefaultRequestCacheSize,
|
||||
defaultValueDesc: $DefaultRequestCacheSize,
|
||||
name: "request-cache-size",
|
||||
hidden
|
||||
.}: uint16
|
||||
|
||||
case persistenceCmd* {.defaultValue: noCmd, command.}: PersistenceCmd
|
||||
of PersistenceCmd.prover:
|
||||
circuitDir* {.
|
||||
@ -482,6 +505,13 @@ proc parseCmdArg*(
|
||||
quit QuitFailure
|
||||
ma
|
||||
|
||||
proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} =
|
||||
let count = parseInt(input)
|
||||
if count != 0 and count < 2:
|
||||
warn "Invalid number of threads", input = input
|
||||
quit QuitFailure
|
||||
ThreadCount(count)
|
||||
|
||||
proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T =
|
||||
var res: SignedPeerRecord
|
||||
try:
|
||||
@ -579,6 +609,15 @@ proc readValue*(
|
||||
quit QuitFailure
|
||||
val = NBytes(value)
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, val: var ThreadCount
|
||||
) {.upraises: [SerializationError, IOError].} =
|
||||
var str = r.readValue(string)
|
||||
try:
|
||||
val = parseCmdArg(ThreadCount, str)
|
||||
except CatchableError as err:
|
||||
raise newException(SerializationError, err.msg)
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, val: var Duration
|
||||
) {.upraises: [SerializationError, IOError].} =
|
||||
@ -609,6 +648,9 @@ proc completeCmdArg*(T: type NBytes, val: string): seq[string] =
|
||||
proc completeCmdArg*(T: type Duration, val: string): seq[string] =
|
||||
discard
|
||||
|
||||
proc completeCmdArg*(T: type ThreadCount, val: string): seq[string] =
|
||||
discard
|
||||
|
||||
# silly chronicles, colors is a compile-time property
|
||||
proc stripAnsi*(v: string): string =
|
||||
var
|
||||
|
||||
@ -4,6 +4,8 @@ import pkg/questionable/results
|
||||
|
||||
export contractabi
|
||||
|
||||
const DefaultRequestCacheSize* = 128.uint16
|
||||
|
||||
type
|
||||
MarketplaceConfig* = object
|
||||
collateral*: CollateralConfig
|
||||
|
||||
@ -2,6 +2,7 @@ import std/strutils
|
||||
import pkg/ethers
|
||||
import pkg/upraises
|
||||
import pkg/questionable
|
||||
import pkg/lrucache
|
||||
import ../utils/exceptions
|
||||
import ../logutils
|
||||
import ../market
|
||||
@ -20,6 +21,7 @@ type
|
||||
signer: Signer
|
||||
rewardRecipient: ?Address
|
||||
configuration: ?MarketplaceConfig
|
||||
requestCache: LruCache[string, StorageRequest]
|
||||
|
||||
MarketSubscription = market.Subscription
|
||||
EventSubscription = ethers.Subscription
|
||||
@ -27,12 +29,22 @@ type
|
||||
eventSubscription: EventSubscription
|
||||
|
||||
func new*(
|
||||
_: type OnChainMarket, contract: Marketplace, rewardRecipient = Address.none
|
||||
_: type OnChainMarket,
|
||||
contract: Marketplace,
|
||||
rewardRecipient = Address.none,
|
||||
requestCacheSize: uint16 = DefaultRequestCacheSize,
|
||||
): OnChainMarket =
|
||||
without signer =? contract.signer:
|
||||
raiseAssert("Marketplace contract should have a signer")
|
||||
|
||||
OnChainMarket(contract: contract, signer: signer, rewardRecipient: rewardRecipient)
|
||||
var requestCache = newLruCache[string, StorageRequest](int(requestCacheSize))
|
||||
|
||||
OnChainMarket(
|
||||
contract: contract,
|
||||
signer: signer,
|
||||
rewardRecipient: rewardRecipient,
|
||||
requestCache: requestCache,
|
||||
)
|
||||
|
||||
proc raiseMarketError(message: string) {.raises: [MarketError].} =
|
||||
raise newException(MarketError, message)
|
||||
@ -112,9 +124,16 @@ method requestStorage(market: OnChainMarket, request: StorageRequest) {.async.}
|
||||
method getRequest*(
|
||||
market: OnChainMarket, id: RequestId
|
||||
): Future[?StorageRequest] {.async.} =
|
||||
let key = $id
|
||||
|
||||
if market.requestCache.contains(key):
|
||||
return some market.requestCache[key]
|
||||
|
||||
convertEthersError:
|
||||
try:
|
||||
return some await market.contract.getRequest(id)
|
||||
let request = await market.contract.getRequest(id)
|
||||
market.requestCache[key] = request
|
||||
return some request
|
||||
except Marketplace_UnknownRequest:
|
||||
return none StorageRequest
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@ proc raiseProviderError(message: string) {.raises: [ProviderError].} =
|
||||
|
||||
proc blockNumberAndTimestamp*(
|
||||
provider: Provider, blockTag: BlockTag
|
||||
): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError]).} =
|
||||
): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
without latestBlock =? await provider.getBlock(blockTag):
|
||||
raiseProviderError("Could not get latest block")
|
||||
|
||||
@ -25,7 +25,7 @@ proc blockNumberAndTimestamp*(
|
||||
|
||||
proc binarySearchFindClosestBlock(
|
||||
provider: Provider, epochTime: int, low: UInt256, high: UInt256
|
||||
): Future[UInt256] {.async: (raises: [ProviderError]).} =
|
||||
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
let (_, lowTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(low))
|
||||
let (_, highTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(high))
|
||||
if abs(lowTimestamp.truncate(int) - epochTime) <
|
||||
@ -39,7 +39,7 @@ proc binarySearchBlockNumberForEpoch(
|
||||
epochTime: UInt256,
|
||||
latestBlockNumber: UInt256,
|
||||
earliestBlockNumber: UInt256,
|
||||
): Future[UInt256] {.async: (raises: [ProviderError]).} =
|
||||
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
var low = earliestBlockNumber
|
||||
var high = latestBlockNumber
|
||||
|
||||
@ -65,7 +65,7 @@ proc binarySearchBlockNumberForEpoch(
|
||||
|
||||
proc blockNumberForEpoch*(
|
||||
provider: Provider, epochTime: SecondsSince1970
|
||||
): Future[UInt256] {.async: (raises: [ProviderError]).} =
|
||||
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
let epochTimeUInt256 = epochTime.u256
|
||||
let (latestBlockNumber, latestBlockTimestamp) =
|
||||
await provider.blockNumberAndTimestamp(BlockTag.latest)
|
||||
@ -118,6 +118,6 @@ proc blockNumberForEpoch*(
|
||||
|
||||
proc pastBlockTag*(
|
||||
provider: Provider, blocksAgo: int
|
||||
): Future[BlockTag] {.async: (raises: [ProviderError]).} =
|
||||
): Future[BlockTag] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
let head = await provider.getBlockNumber()
|
||||
return BlockTag.init(head - blocksAgo.abs.u256)
|
||||
|
||||
@ -6,8 +6,10 @@ import pkg/nimcrypto
|
||||
import pkg/ethers/fields
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/byteutils
|
||||
import pkg/libp2p/[cid, multicodec]
|
||||
import ../logutils
|
||||
import ../utils/json
|
||||
from ../errors import mapFailure
|
||||
|
||||
export contractabi
|
||||
|
||||
@ -29,7 +31,7 @@ type
|
||||
maxSlotLoss* {.serialize.}: uint64
|
||||
|
||||
StorageContent* = object
|
||||
cid* {.serialize.}: string
|
||||
cid* {.serialize.}: Cid
|
||||
merkleRoot*: array[32, byte]
|
||||
|
||||
Slot* = object
|
||||
@ -120,6 +122,9 @@ func fromTuple(_: type StorageAsk, tupl: tuple): StorageAsk =
|
||||
func fromTuple(_: type StorageContent, tupl: tuple): StorageContent =
|
||||
StorageContent(cid: tupl[0], merkleRoot: tupl[1])
|
||||
|
||||
func solidityType*(_: type Cid): string =
|
||||
solidityType(seq[byte])
|
||||
|
||||
func solidityType*(_: type StorageContent): string =
|
||||
solidityType(StorageContent.fieldTypes)
|
||||
|
||||
@ -129,6 +134,10 @@ func solidityType*(_: type StorageAsk): string =
|
||||
func solidityType*(_: type StorageRequest): string =
|
||||
solidityType(StorageRequest.fieldTypes)
|
||||
|
||||
# Note: it seems to be ok to ignore the vbuffer offset for now
|
||||
func encode*(encoder: var AbiEncoder, cid: Cid) =
|
||||
encoder.write(cid.data.buffer)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, content: StorageContent) =
|
||||
encoder.write(content.fieldValues)
|
||||
|
||||
@ -141,8 +150,12 @@ func encode*(encoder: var AbiEncoder, id: RequestId | SlotId | Nonce) =
|
||||
func encode*(encoder: var AbiEncoder, request: StorageRequest) =
|
||||
encoder.write(request.fieldValues)
|
||||
|
||||
func encode*(encoder: var AbiEncoder, request: Slot) =
|
||||
encoder.write(request.fieldValues)
|
||||
func encode*(encoder: var AbiEncoder, slot: Slot) =
|
||||
encoder.write(slot.fieldValues)
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type Cid): ?!T =
|
||||
let data = ?decoder.read(seq[byte])
|
||||
Cid.init(data).mapFailure
|
||||
|
||||
func decode*(decoder: var AbiDecoder, T: type StorageContent): ?!T =
|
||||
let tupl = ?decoder.read(StorageContent.fieldTypes)
|
||||
|
||||
@ -29,14 +29,18 @@ method release*(self: ErasureBackend) {.base, gcsafe.} =
|
||||
raiseAssert("not implemented!")
|
||||
|
||||
method encode*(
|
||||
self: EncoderBackend, buffers, parity: var openArray[seq[byte]]
|
||||
self: EncoderBackend,
|
||||
buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen: int,
|
||||
): Result[void, cstring] {.base, gcsafe.} =
|
||||
## encode buffers using a backend
|
||||
##
|
||||
raiseAssert("not implemented!")
|
||||
|
||||
method decode*(
|
||||
self: DecoderBackend, buffers, parity, recovered: var openArray[seq[byte]]
|
||||
self: DecoderBackend,
|
||||
buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen, recoveredLen: int,
|
||||
): Result[void, cstring] {.base, gcsafe.} =
|
||||
## decode buffers using a backend
|
||||
##
|
||||
|
||||
@ -22,11 +22,13 @@ type
|
||||
decoder*: Option[LeoDecoder]
|
||||
|
||||
method encode*(
|
||||
self: LeoEncoderBackend, data, parity: var openArray[seq[byte]]
|
||||
self: LeoEncoderBackend,
|
||||
data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen: int,
|
||||
): Result[void, cstring] =
|
||||
## Encode data using Leopard backend
|
||||
|
||||
if parity.len == 0:
|
||||
if parityLen == 0:
|
||||
return ok()
|
||||
|
||||
var encoder =
|
||||
@ -36,10 +38,12 @@ method encode*(
|
||||
else:
|
||||
self.encoder.get()
|
||||
|
||||
encoder.encode(data, parity)
|
||||
encoder.encode(data, parity, dataLen, parityLen)
|
||||
|
||||
method decode*(
|
||||
self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]]
|
||||
self: LeoDecoderBackend,
|
||||
data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
dataLen, parityLen, recoveredLen: int,
|
||||
): Result[void, cstring] =
|
||||
## Decode data using given Leopard backend
|
||||
|
||||
@ -50,7 +54,7 @@ method decode*(
|
||||
else:
|
||||
self.decoder.get()
|
||||
|
||||
decoder.decode(data, parity, recovered)
|
||||
decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen)
|
||||
|
||||
method release*(self: LeoEncoderBackend) =
|
||||
if self.encoder.isSome:
|
||||
|
||||
@ -12,12 +12,14 @@ import pkg/upraises
|
||||
push:
|
||||
{.upraises: [].}
|
||||
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
import std/[sugar, atomics, sequtils]
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronos/threadsync
|
||||
import pkg/chronicles
|
||||
import pkg/libp2p/[multicodec, cid, multihash]
|
||||
import pkg/libp2p/protobuf/minprotobuf
|
||||
import pkg/taskpools
|
||||
|
||||
import ../logutils
|
||||
import ../manifest
|
||||
@ -28,6 +30,7 @@ import ../utils
|
||||
import ../utils/asynciter
|
||||
import ../indexingstrategy
|
||||
import ../errors
|
||||
import ../utils/arrayutils
|
||||
|
||||
import pkg/stew/byteutils
|
||||
|
||||
@ -68,6 +71,7 @@ type
|
||||
proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.}
|
||||
|
||||
Erasure* = ref object
|
||||
taskPool: Taskpool
|
||||
encoderProvider*: EncoderProvider
|
||||
decoderProvider*: DecoderProvider
|
||||
store*: BlockStore
|
||||
@ -87,6 +91,24 @@ type
|
||||
# provided.
|
||||
minSize*: NBytes
|
||||
|
||||
EncodeTask = object
|
||||
success: Atomic[bool]
|
||||
erasure: ptr Erasure
|
||||
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
blockSize, blocksLen, parityLen: int
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
DecodeTask = object
|
||||
success: Atomic[bool]
|
||||
erasure: ptr Erasure
|
||||
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]]
|
||||
blockSize, blocksLen: int
|
||||
parityLen, recoveredLen: int
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
func indexToPos(steps, idx, step: int): int {.inline.} =
|
||||
## Convert an index to a position in the encoded
|
||||
## dataset
|
||||
@ -269,6 +291,81 @@ proc init*(
|
||||
strategy: strategy,
|
||||
)
|
||||
|
||||
proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} =
|
||||
# Task suitable for running in taskpools - look, no GC!
|
||||
let encoder =
|
||||
task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
|
||||
defer:
|
||||
encoder.release()
|
||||
discard task[].signal.fireSync()
|
||||
|
||||
if (
|
||||
let res =
|
||||
encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen)
|
||||
res.isErr
|
||||
):
|
||||
warn "Error from leopard encoder backend!", error = $res.error
|
||||
|
||||
task[].success.store(false)
|
||||
else:
|
||||
task[].success.store(true)
|
||||
|
||||
proc encodeAsync*(
|
||||
self: Erasure,
|
||||
blockSize, blocksLen, parityLen: int,
|
||||
data: ref seq[seq[byte]],
|
||||
parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
without threadPtr =? ThreadSignalPtr.new():
|
||||
return failure("Unable to create thread signal")
|
||||
|
||||
defer:
|
||||
threadPtr.close().expect("closing once works")
|
||||
|
||||
var blockData = createDoubleArray(blocksLen, blockSize)
|
||||
|
||||
for i in 0 ..< data[].len:
|
||||
copyMem(blockData[i], addr data[i][0], blockSize)
|
||||
|
||||
defer:
|
||||
freeDoubleArray(blockData, blocksLen)
|
||||
|
||||
## Create an ecode task with block data
|
||||
var task = EncodeTask(
|
||||
erasure: addr self,
|
||||
blockSize: blockSize,
|
||||
blocksLen: blocksLen,
|
||||
parityLen: parityLen,
|
||||
blocks: blockData,
|
||||
parity: parity,
|
||||
signal: threadPtr,
|
||||
)
|
||||
|
||||
let t = addr task
|
||||
|
||||
doAssert self.taskPool.numThreads > 1,
|
||||
"Must have at least one separate thread or signal will never be fired"
|
||||
self.taskPool.spawn leopardEncodeTask(self.taskPool, t)
|
||||
let threadFut = threadPtr.wait()
|
||||
|
||||
try:
|
||||
await threadFut.join()
|
||||
except CatchableError as exc:
|
||||
try:
|
||||
await threadFut
|
||||
except AsyncError as asyncExc:
|
||||
return failure(asyncExc.msg)
|
||||
finally:
|
||||
if exc of CancelledError:
|
||||
raise (ref CancelledError) exc
|
||||
else:
|
||||
return failure(exc.msg)
|
||||
|
||||
if not t.success.load():
|
||||
return failure("Leopard encoding failed")
|
||||
|
||||
success()
|
||||
|
||||
proc encodeData(
|
||||
self: Erasure, manifest: Manifest, params: EncodingParams
|
||||
): Future[?!Manifest] {.async.} =
|
||||
@ -276,7 +373,6 @@ proc encodeData(
|
||||
##
|
||||
## `manifest` - the manifest to encode
|
||||
##
|
||||
|
||||
logScope:
|
||||
steps = params.steps
|
||||
rounded_blocks = params.rounded
|
||||
@ -286,7 +382,6 @@ proc encodeData(
|
||||
|
||||
var
|
||||
cids = seq[Cid].new()
|
||||
encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM)
|
||||
emptyBlock = newSeq[byte](manifest.blockSize.int)
|
||||
|
||||
cids[].setLen(params.blocksCount)
|
||||
@ -296,8 +391,7 @@ proc encodeData(
|
||||
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
||||
var
|
||||
data = seq[seq[byte]].new() # number of blocks to encode
|
||||
parityData =
|
||||
newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int))
|
||||
parity = createDoubleArray(params.ecM, manifest.blockSize.int)
|
||||
|
||||
data[].setLen(params.ecK)
|
||||
# TODO: this is a tight blocking loop so we sleep here to allow
|
||||
@ -311,15 +405,25 @@ proc encodeData(
|
||||
trace "Unable to prepare data", error = err.msg
|
||||
return failure(err)
|
||||
|
||||
trace "Erasure coding data", data = data[].len, parity = parityData.len
|
||||
trace "Erasure coding data", data = data[].len
|
||||
|
||||
if (let res = encoder.encode(data[], parityData); res.isErr):
|
||||
trace "Unable to encode manifest!", error = $res.error
|
||||
return failure($res.error)
|
||||
try:
|
||||
if err =? (
|
||||
await self.encodeAsync(
|
||||
manifest.blockSize.int, params.ecK, params.ecM, data, parity
|
||||
)
|
||||
).errorOption:
|
||||
return failure(err)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
finally:
|
||||
freeDoubleArray(parity, params.ecM)
|
||||
|
||||
var idx = params.rounded + step
|
||||
for j in 0 ..< params.ecM:
|
||||
without blk =? bt.Block.new(parityData[j]), error:
|
||||
var innerPtr: ptr UncheckedArray[byte] = parity[][j]
|
||||
without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)),
|
||||
error:
|
||||
trace "Unable to create parity block", err = error.msg
|
||||
return failure(error)
|
||||
|
||||
@ -356,8 +460,6 @@ proc encodeData(
|
||||
except CatchableError as exc:
|
||||
trace "Erasure coding encoding error", exc = exc.msg
|
||||
return failure(exc)
|
||||
finally:
|
||||
encoder.release()
|
||||
|
||||
proc encode*(
|
||||
self: Erasure,
|
||||
@ -381,6 +483,101 @@ proc encode*(
|
||||
|
||||
return success encodedManifest
|
||||
|
||||
proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} =
|
||||
# Task suitable for running in taskpools - look, no GC!
|
||||
let decoder =
|
||||
task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
|
||||
defer:
|
||||
decoder.release()
|
||||
|
||||
if (
|
||||
let res = decoder.decode(
|
||||
task[].blocks,
|
||||
task[].parity,
|
||||
task[].recovered,
|
||||
task[].blocksLen,
|
||||
task[].parityLen,
|
||||
task[].recoveredLen,
|
||||
)
|
||||
res.isErr
|
||||
):
|
||||
warn "Error from leopard decoder backend!", error = $res.error
|
||||
task[].success.store(false)
|
||||
else:
|
||||
task[].success.store(true)
|
||||
|
||||
discard task[].signal.fireSync()
|
||||
|
||||
proc decodeAsync*(
|
||||
self: Erasure,
|
||||
blockSize, blocksLen, parityLen: int,
|
||||
blocks, parity: ref seq[seq[byte]],
|
||||
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
without threadPtr =? ThreadSignalPtr.new():
|
||||
return failure("Unable to create thread signal")
|
||||
|
||||
defer:
|
||||
threadPtr.close().expect("closing once works")
|
||||
|
||||
var
|
||||
blocksData = createDoubleArray(blocksLen, blockSize)
|
||||
parityData = createDoubleArray(parityLen, blockSize)
|
||||
|
||||
for i in 0 ..< blocks[].len:
|
||||
if blocks[i].len > 0:
|
||||
copyMem(blocksData[i], addr blocks[i][0], blockSize)
|
||||
else:
|
||||
blocksData[i] = nil
|
||||
|
||||
for i in 0 ..< parity[].len:
|
||||
if parity[i].len > 0:
|
||||
copyMem(parityData[i], addr parity[i][0], blockSize)
|
||||
else:
|
||||
parityData[i] = nil
|
||||
|
||||
defer:
|
||||
freeDoubleArray(blocksData, blocksLen)
|
||||
freeDoubleArray(parityData, parityLen)
|
||||
|
||||
## Create an decode task with block data
|
||||
var task = DecodeTask(
|
||||
erasure: addr self,
|
||||
blockSize: blockSize,
|
||||
blocksLen: blocksLen,
|
||||
parityLen: parityLen,
|
||||
recoveredLen: blocksLen,
|
||||
blocks: blocksData,
|
||||
parity: parityData,
|
||||
recovered: recovered,
|
||||
signal: threadPtr,
|
||||
)
|
||||
|
||||
# Hold the task pointer until the signal is received
|
||||
let t = addr task
|
||||
doAssert self.taskPool.numThreads > 1,
|
||||
"Must have at least one separate thread or signal will never be fired"
|
||||
self.taskPool.spawn leopardDecodeTask(self.taskPool, t)
|
||||
let threadFut = threadPtr.wait()
|
||||
|
||||
try:
|
||||
await threadFut.join()
|
||||
except CatchableError as exc:
|
||||
try:
|
||||
await threadFut
|
||||
except AsyncError as asyncExc:
|
||||
return failure(asyncExc.msg)
|
||||
finally:
|
||||
if exc of CancelledError:
|
||||
raise (ref CancelledError) exc
|
||||
else:
|
||||
return failure(exc.msg)
|
||||
|
||||
if not t.success.load():
|
||||
return failure("Leopard encoding failed")
|
||||
|
||||
success()
|
||||
|
||||
proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
## Decode a protected manifest into it's original
|
||||
## manifest
|
||||
@ -388,7 +585,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
## `encoded` - the encoded (protected) manifest to
|
||||
## be recovered
|
||||
##
|
||||
|
||||
logScope:
|
||||
steps = encoded.steps
|
||||
rounded_blocks = encoded.rounded
|
||||
@ -411,8 +607,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
var
|
||||
data = seq[seq[byte]].new()
|
||||
parityData = seq[seq[byte]].new()
|
||||
recovered =
|
||||
newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
|
||||
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
|
||||
|
||||
data[].setLen(encoded.ecK) # set len to K
|
||||
parityData[].setLen(encoded.ecM) # set len to M
|
||||
@ -430,15 +625,26 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
continue
|
||||
|
||||
trace "Erasure decoding data"
|
||||
|
||||
if (let err = decoder.decode(data[], parityData[], recovered); err.isErr):
|
||||
trace "Unable to decode data!", err = $err.error
|
||||
return failure($err.error)
|
||||
try:
|
||||
if err =? (
|
||||
await self.decodeAsync(
|
||||
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
|
||||
)
|
||||
).errorOption:
|
||||
return failure(err)
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
finally:
|
||||
freeDoubleArray(recovered, encoded.ecK)
|
||||
|
||||
for i in 0 ..< encoded.ecK:
|
||||
let idx = i * encoded.steps + step
|
||||
if data[i].len <= 0 and not cids[idx].isEmpty:
|
||||
without blk =? bt.Block.new(recovered[i]), error:
|
||||
var innerPtr: ptr UncheckedArray[byte] = recovered[][i]
|
||||
|
||||
without blk =? bt.Block.new(
|
||||
innerPtr.toOpenArray(0, encoded.blockSize.int - 1)
|
||||
), error:
|
||||
trace "Unable to create block!", exc = error.msg
|
||||
return failure(error)
|
||||
|
||||
@ -490,10 +696,13 @@ proc new*(
|
||||
store: BlockStore,
|
||||
encoderProvider: EncoderProvider,
|
||||
decoderProvider: DecoderProvider,
|
||||
taskPool: Taskpool,
|
||||
): Erasure =
|
||||
## Create a new Erasure instance for encoding and decoding manifests
|
||||
##
|
||||
|
||||
Erasure(
|
||||
store: store, encoderProvider: encoderProvider, decoderProvider: decoderProvider
|
||||
store: store,
|
||||
encoderProvider: encoderProvider,
|
||||
decoderProvider: decoderProvider,
|
||||
taskPool: taskPool,
|
||||
)
|
||||
|
||||
@ -63,7 +63,6 @@ proc encode*(manifest: Manifest): ?!seq[byte] =
|
||||
# optional ErasureInfo erasure = 7; # erasure coding info
|
||||
# optional filename: ?string = 8; # original filename
|
||||
# optional mimetype: ?string = 9; # original mimetype
|
||||
# optional uploadedAt: ?int64 = 10; # original uploadedAt
|
||||
# }
|
||||
# ```
|
||||
#
|
||||
@ -102,9 +101,6 @@ proc encode*(manifest: Manifest): ?!seq[byte] =
|
||||
if manifest.mimetype.isSome:
|
||||
header.write(9, manifest.mimetype.get())
|
||||
|
||||
if manifest.uploadedAt.isSome:
|
||||
header.write(10, manifest.uploadedAt.get().uint64)
|
||||
|
||||
pbNode.write(1, header) # set the treeCid as the data field
|
||||
pbNode.finish()
|
||||
|
||||
@ -135,7 +131,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
|
||||
verifiableStrategy: uint32
|
||||
filename: string
|
||||
mimetype: string
|
||||
uploadedAt: uint64
|
||||
|
||||
# Decode `Header` message
|
||||
if pbNode.getField(1, pbHeader).isErr:
|
||||
@ -169,9 +164,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
|
||||
if pbHeader.getField(9, mimetype).isErr:
|
||||
return failure("Unable to decode `mimetype` from manifest!")
|
||||
|
||||
if pbHeader.getField(10, uploadedAt).isErr:
|
||||
return failure("Unable to decode `uploadedAt` from manifest!")
|
||||
|
||||
let protected = pbErasureInfo.buffer.len > 0
|
||||
var verifiable = false
|
||||
if protected:
|
||||
@ -211,7 +203,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
|
||||
|
||||
var filenameOption = if filename.len == 0: string.none else: filename.some
|
||||
var mimetypeOption = if mimetype.len == 0: string.none else: mimetype.some
|
||||
var uploadedAtOption = if uploadedAt == 0: int64.none else: uploadedAt.int64.some
|
||||
|
||||
let self =
|
||||
if protected:
|
||||
@ -229,7 +220,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
|
||||
strategy = StrategyType(protectedStrategy),
|
||||
filename = filenameOption,
|
||||
mimetype = mimetypeOption,
|
||||
uploadedAt = uploadedAtOption,
|
||||
)
|
||||
else:
|
||||
Manifest.new(
|
||||
@ -241,7 +231,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
|
||||
codec = codec.MultiCodec,
|
||||
filename = filenameOption,
|
||||
mimetype = mimetypeOption,
|
||||
uploadedAt = uploadedAtOption,
|
||||
)
|
||||
|
||||
?self.verify()
|
||||
|
||||
@ -38,7 +38,6 @@ type Manifest* = ref object of RootObj
|
||||
version: CidVersion # Cid version
|
||||
filename {.serialize.}: ?string # The filename of the content uploaded (optional)
|
||||
mimetype {.serialize.}: ?string # The mimetype of the content uploaded (optional)
|
||||
uploadedAt {.serialize.}: ?int64 # The UTC creation timestamp in seconds
|
||||
case protected {.serialize.}: bool # Protected datasets have erasure coded info
|
||||
of true:
|
||||
ecK: int # Number of blocks to encode
|
||||
@ -131,8 +130,6 @@ func filename*(self: Manifest): ?string =
|
||||
func mimetype*(self: Manifest): ?string =
|
||||
self.mimetype
|
||||
|
||||
func uploadedAt*(self: Manifest): ?int64 =
|
||||
self.uploadedAt
|
||||
############################################################
|
||||
# Operations on block list
|
||||
############################################################
|
||||
@ -165,14 +162,11 @@ func verify*(self: Manifest): ?!void =
|
||||
|
||||
return success()
|
||||
|
||||
func cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} =
|
||||
self.treeCid.success
|
||||
|
||||
func `==`*(a, b: Manifest): bool =
|
||||
(a.treeCid == b.treeCid) and (a.datasetSize == b.datasetSize) and
|
||||
(a.blockSize == b.blockSize) and (a.version == b.version) and (a.hcodec == b.hcodec) and
|
||||
(a.codec == b.codec) and (a.protected == b.protected) and (a.filename == b.filename) and
|
||||
(a.mimetype == b.mimetype) and (a.uploadedAt == b.uploadedAt) and (
|
||||
(a.mimetype == b.mimetype) and (
|
||||
if a.protected:
|
||||
(a.ecK == b.ecK) and (a.ecM == b.ecM) and (a.originalTreeCid == b.originalTreeCid) and
|
||||
(a.originalDatasetSize == b.originalDatasetSize) and
|
||||
@ -202,9 +196,6 @@ func `$`*(self: Manifest): string =
|
||||
if self.mimetype.isSome:
|
||||
result &= ", mimetype: " & $self.mimetype
|
||||
|
||||
if self.uploadedAt.isSome:
|
||||
result &= ", uploadedAt: " & $self.uploadedAt
|
||||
|
||||
result &= (
|
||||
if self.protected:
|
||||
", ecK: " & $self.ecK & ", ecM: " & $self.ecM & ", originalTreeCid: " &
|
||||
@ -236,7 +227,6 @@ func new*(
|
||||
protected = false,
|
||||
filename: ?string = string.none,
|
||||
mimetype: ?string = string.none,
|
||||
uploadedAt: ?int64 = int64.none,
|
||||
): Manifest =
|
||||
T(
|
||||
treeCid: treeCid,
|
||||
@ -248,7 +238,6 @@ func new*(
|
||||
protected: protected,
|
||||
filename: filename,
|
||||
mimetype: mimetype,
|
||||
uploadedAt: uploadedAt,
|
||||
)
|
||||
|
||||
func new*(
|
||||
@ -278,7 +267,6 @@ func new*(
|
||||
protectedStrategy: strategy,
|
||||
filename: manifest.filename,
|
||||
mimetype: manifest.mimetype,
|
||||
uploadedAt: manifest.uploadedAt,
|
||||
)
|
||||
|
||||
func new*(T: type Manifest, manifest: Manifest): Manifest =
|
||||
@ -296,7 +284,6 @@ func new*(T: type Manifest, manifest: Manifest): Manifest =
|
||||
protected: false,
|
||||
filename: manifest.filename,
|
||||
mimetype: manifest.mimetype,
|
||||
uploadedAt: manifest.uploadedAt,
|
||||
)
|
||||
|
||||
func new*(
|
||||
@ -314,7 +301,6 @@ func new*(
|
||||
strategy = SteppedStrategy,
|
||||
filename: ?string = string.none,
|
||||
mimetype: ?string = string.none,
|
||||
uploadedAt: ?int64 = int64.none,
|
||||
): Manifest =
|
||||
Manifest(
|
||||
treeCid: treeCid,
|
||||
@ -331,7 +317,6 @@ func new*(
|
||||
protectedStrategy: strategy,
|
||||
filename: filename,
|
||||
mimetype: mimetype,
|
||||
uploadedAt: uploadedAt,
|
||||
)
|
||||
|
||||
func new*(
|
||||
@ -374,7 +359,6 @@ func new*(
|
||||
verifiableStrategy: strategy,
|
||||
filename: manifest.filename,
|
||||
mimetype: manifest.mimetype,
|
||||
uploadedAt: manifest.uploadedAt,
|
||||
)
|
||||
|
||||
func new*(T: type Manifest, data: openArray[byte]): ?!Manifest =
|
||||
|
||||
103
codex/node.nim
103
codex/node.nim
@ -15,6 +15,7 @@ import std/strformat
|
||||
import std/sugar
|
||||
import times
|
||||
|
||||
import pkg/taskpools
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/chronos
|
||||
@ -70,6 +71,7 @@ type
|
||||
contracts*: Contracts
|
||||
clock*: Clock
|
||||
storage*: Contracts
|
||||
taskpool: Taskpool
|
||||
|
||||
CodexNodeRef* = ref CodexNode
|
||||
|
||||
@ -235,8 +237,9 @@ proc streamEntireDataset(
|
||||
# Retrieve, decode and save to the local store all EС groups
|
||||
proc erasureJob(): Future[?!void] {.async.} =
|
||||
# Spawn an erasure decoding job
|
||||
let erasure =
|
||||
Erasure.new(self.networkStore, leoEncoderProvider, leoDecoderProvider)
|
||||
let erasure = Erasure.new(
|
||||
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
)
|
||||
without _ =? (await erasure.decode(manifest)), error:
|
||||
error "Unable to erasure decode manifest", manifestCid, exc = error.msg
|
||||
return failure(error)
|
||||
@ -267,6 +270,65 @@ proc retrieve*(
|
||||
|
||||
await self.streamEntireDataset(manifest, cid)
|
||||
|
||||
proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
if err =? (await self.networkStore.delBlock(cid)).errorOption:
|
||||
error "Error deleting block", cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
trace "Deleted block", cid
|
||||
return success()
|
||||
|
||||
proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
# Deletion is a strictly local operation
|
||||
var store = self.networkStore.localStore
|
||||
|
||||
if not (await cid in store):
|
||||
# As per the contract for delete*, an absent dataset is not an error.
|
||||
return success()
|
||||
|
||||
without manifestBlock =? await store.getBlock(cid), err:
|
||||
return failure(err)
|
||||
|
||||
without manifest =? Manifest.decode(manifestBlock), err:
|
||||
return failure(err)
|
||||
|
||||
let runtimeQuota = initDuration(milliseconds = 100)
|
||||
var lastIdle = getTime()
|
||||
for i in 0 ..< manifest.blocksCount:
|
||||
if (getTime() - lastIdle) >= runtimeQuota:
|
||||
await idleAsync()
|
||||
lastIdle = getTime()
|
||||
|
||||
if err =? (await store.delBlock(manifest.treeCid, i)).errorOption:
|
||||
# The contract for delBlock is fuzzy, but we assume that if the block is
|
||||
# simply missing we won't get an error. This is a best effort operation and
|
||||
# can simply be retried.
|
||||
error "Failed to delete block within dataset", index = i, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if err =? (await store.delBlock(cid)).errorOption:
|
||||
error "Error deleting manifest block", err = err.msg
|
||||
|
||||
success()
|
||||
|
||||
proc delete*(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
): Future[?!void] {.async: (raises: [CatchableError]).} =
|
||||
## Deletes a whole dataset, if Cid is a Manifest Cid, or a single block, if Cid a block Cid,
|
||||
## from the underlying block store. This is a strictly local operation.
|
||||
##
|
||||
## Missing blocks in dataset deletes are ignored.
|
||||
##
|
||||
|
||||
without isManifest =? cid.isManifest, err:
|
||||
trace "Bad content type for CID:", cid = cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if not isManifest:
|
||||
return await self.deleteSingleBlock(cid)
|
||||
|
||||
await self.deleteEntireDataset(cid)
|
||||
|
||||
proc store*(
|
||||
self: CodexNodeRef,
|
||||
stream: LPStream,
|
||||
@ -332,7 +394,6 @@ proc store*(
|
||||
codec = dataCodec,
|
||||
filename = filename,
|
||||
mimetype = mimetype,
|
||||
uploadedAt = now().utc.toTime.toUnix.some,
|
||||
)
|
||||
|
||||
without manifestBlk =? await self.storeManifest(manifest), err:
|
||||
@ -403,8 +464,9 @@ proc setupRequest(
|
||||
return failure error
|
||||
|
||||
# Erasure code the dataset according to provided parameters
|
||||
let erasure =
|
||||
Erasure.new(self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider)
|
||||
let erasure = Erasure.new(
|
||||
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
|
||||
)
|
||||
|
||||
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
|
||||
trace "Unable to erasure code dataset"
|
||||
@ -439,10 +501,7 @@ proc setupRequest(
|
||||
collateralPerByte: collateralPerByte,
|
||||
maxSlotLoss: tolerance,
|
||||
),
|
||||
content: StorageContent(
|
||||
cid: $manifestBlk.cid, # TODO: why string?
|
||||
merkleRoot: verifyRoot,
|
||||
),
|
||||
content: StorageContent(cid: manifestBlk.cid, merkleRoot: verifyRoot),
|
||||
expiry: expiry,
|
||||
)
|
||||
|
||||
@ -499,16 +558,14 @@ proc onStore(
|
||||
## store data in local storage
|
||||
##
|
||||
|
||||
let cid = request.content.cid
|
||||
|
||||
logScope:
|
||||
cid = request.content.cid
|
||||
cid = $cid
|
||||
slotIdx = slotIdx
|
||||
|
||||
trace "Received a request to store a slot"
|
||||
|
||||
without cid =? Cid.init(request.content.cid).mapFailure, err:
|
||||
trace "Unable to parse Cid", cid
|
||||
return failure(err)
|
||||
|
||||
without manifest =? (await self.fetchManifest(cid)), err:
|
||||
trace "Unable to fetch manifest for cid", cid, err = err.msg
|
||||
return failure(err)
|
||||
@ -578,7 +635,7 @@ proc onProve(
|
||||
##
|
||||
|
||||
let
|
||||
cidStr = slot.request.content.cid
|
||||
cidStr = $slot.request.content.cid
|
||||
slotIdx = slot.slotIndex.truncate(Natural)
|
||||
|
||||
logScope:
|
||||
@ -627,14 +684,9 @@ proc onProve(
|
||||
failure "Prover not enabled"
|
||||
|
||||
proc onExpiryUpdate(
|
||||
self: CodexNodeRef, rootCid: string, expiry: SecondsSince1970
|
||||
self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970
|
||||
): Future[?!void] {.async.} =
|
||||
without cid =? Cid.init(rootCid):
|
||||
trace "Unable to parse Cid", cid
|
||||
let error = newException(CodexError, "Unable to parse Cid")
|
||||
return failure(error)
|
||||
|
||||
return await self.updateExpiry(cid, expiry)
|
||||
return await self.updateExpiry(rootCid, expiry)
|
||||
|
||||
proc onClear(self: CodexNodeRef, request: StorageRequest, slotIndex: UInt256) =
|
||||
# TODO: remove data from local storage
|
||||
@ -657,7 +709,7 @@ proc start*(self: CodexNodeRef) {.async.} =
|
||||
self.onStore(request, slot, onBatch)
|
||||
|
||||
hostContracts.sales.onExpiryUpdate = proc(
|
||||
rootCid: string, expiry: SecondsSince1970
|
||||
rootCid: Cid, expiry: SecondsSince1970
|
||||
): Future[?!void] =
|
||||
self.onExpiryUpdate(rootCid, expiry)
|
||||
|
||||
@ -724,12 +776,16 @@ proc stop*(self: CodexNodeRef) {.async.} =
|
||||
if not self.networkStore.isNil:
|
||||
await self.networkStore.close
|
||||
|
||||
if not self.taskpool.isNil:
|
||||
self.taskpool.shutdown()
|
||||
|
||||
proc new*(
|
||||
T: type CodexNodeRef,
|
||||
switch: Switch,
|
||||
networkStore: NetworkStore,
|
||||
engine: BlockExcEngine,
|
||||
discovery: Discovery,
|
||||
taskpool: Taskpool,
|
||||
prover = Prover.none,
|
||||
contracts = Contracts.default,
|
||||
): CodexNodeRef =
|
||||
@ -742,5 +798,6 @@ proc new*(
|
||||
engine: engine,
|
||||
prover: prover,
|
||||
discovery: discovery,
|
||||
taskPool: taskpool,
|
||||
contracts: contracts,
|
||||
)
|
||||
|
||||
@ -204,7 +204,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
trace "Handling file upload"
|
||||
var bodyReader = request.getBodyReader()
|
||||
if bodyReader.isErr():
|
||||
return RestApiResponse.error(Http500)
|
||||
return RestApiResponse.error(Http500, msg = bodyReader.error())
|
||||
|
||||
# Attempt to handle `Expect` header
|
||||
# some clients (curl), wait 1000ms
|
||||
@ -215,10 +215,13 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
var mimetype = request.headers.getString(ContentTypeHeader).some
|
||||
|
||||
if mimetype.get() != "":
|
||||
let mimetypeVal = mimetype.get()
|
||||
var m = newMimetypes()
|
||||
let extension = m.getExt(mimetype.get(), "")
|
||||
let extension = m.getExt(mimetypeVal, "")
|
||||
if extension == "":
|
||||
return RestApiResponse.error(Http422, "The MIME type is not valid.")
|
||||
return RestApiResponse.error(
|
||||
Http422, "The MIME type '" & mimetypeVal & "' is not valid."
|
||||
)
|
||||
else:
|
||||
mimetype = string.none
|
||||
|
||||
@ -256,13 +259,19 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
finally:
|
||||
await reader.closeWait()
|
||||
|
||||
trace "Something went wrong error"
|
||||
return RestApiResponse.error(Http500)
|
||||
|
||||
router.api(MethodGet, "/api/codex/v1/data") do() -> RestApiResponse:
|
||||
let json = await formatManifestBlocks(node)
|
||||
return RestApiResponse.response($json, contentType = "application/json")
|
||||
|
||||
router.api(MethodOptions, "/api/codex/v1/data/{cid}") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
if corsOrigin =? allowedOrigin:
|
||||
resp.setCorsHeaders("GET,DELETE", corsOrigin)
|
||||
|
||||
resp.status = Http204
|
||||
await resp.sendBody("")
|
||||
|
||||
router.api(MethodGet, "/api/codex/v1/data/{cid}") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
@ -279,6 +288,27 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
|
||||
await node.retrieveCid(cid.get(), local = true, resp = resp)
|
||||
|
||||
router.api(MethodDelete, "/api/codex/v1/data/{cid}") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
## Deletes either a single block or an entire dataset
|
||||
## from the local node. Does nothing and returns 200
|
||||
## if the dataset is not locally available.
|
||||
##
|
||||
var headers = buildCorsHeaders("DELETE", allowedOrigin)
|
||||
|
||||
if cid.isErr:
|
||||
return RestApiResponse.error(Http400, $cid.error(), headers = headers)
|
||||
|
||||
if err =? (await node.delete(cid.get())).errorOption:
|
||||
return RestApiResponse.error(Http500, err.msg, headers = headers)
|
||||
|
||||
if corsOrigin =? allowedOrigin:
|
||||
resp.setCorsHeaders("DELETE", corsOrigin)
|
||||
|
||||
resp.status = Http204
|
||||
await resp.sendBody("")
|
||||
|
||||
router.api(MethodPost, "/api/codex/v1/data/{cid}/network") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
import pkg/libp2p/cid
|
||||
|
||||
import ../market
|
||||
import ../clock
|
||||
@ -30,7 +31,7 @@ type
|
||||
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
|
||||
gcsafe, upraises: []
|
||||
.}
|
||||
OnExpiryUpdate* = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {.
|
||||
OnExpiryUpdate* = proc(rootCid: Cid, expiry: SecondsSince1970): Future[?!void] {.
|
||||
gcsafe, upraises: []
|
||||
.}
|
||||
OnClear* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].}
|
||||
|
||||
@ -189,7 +189,7 @@ proc getCellHashes*[T, H](
|
||||
blkIdx = blkIdx
|
||||
pos = i
|
||||
|
||||
trace "Getting block CID for tree at index"
|
||||
trace "Getting block CID for tree at index", index = blkIdx
|
||||
without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root,
|
||||
err:
|
||||
error "Failed to get block CID for tree at index", err = err.msg
|
||||
|
||||
@ -57,6 +57,17 @@ proc putLeafMetadata*(
|
||||
(md.some, res),
|
||||
)
|
||||
|
||||
proc delLeafMetadata*(
|
||||
self: RepoStore, treeCid: Cid, index: Natural
|
||||
): Future[?!void] {.async.} =
|
||||
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
if err =? (await self.metaDs.delete(key)).errorOption:
|
||||
return failure(err)
|
||||
|
||||
success()
|
||||
|
||||
proc getLeafMetadata*(
|
||||
self: RepoStore, treeCid: Cid, index: Natural
|
||||
): Future[?!LeafMetadata] {.async.} =
|
||||
@ -205,9 +216,6 @@ proc storeBlock*(
|
||||
proc tryDeleteBlock*(
|
||||
self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low
|
||||
): Future[?!DeleteResult] {.async.} =
|
||||
if cid.isEmpty:
|
||||
return success(DeleteResult(kind: InUse))
|
||||
|
||||
without metaKey =? createBlockExpirationMetadataKey(cid), err:
|
||||
return failure(err)
|
||||
|
||||
|
||||
@ -186,13 +186,13 @@ method putBlock*(
|
||||
|
||||
return success()
|
||||
|
||||
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||
## Delete a block from the blockstore when block refCount is 0 or block is expired
|
||||
##
|
||||
|
||||
proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.async.} =
|
||||
logScope:
|
||||
cid = cid
|
||||
|
||||
if cid.isEmpty:
|
||||
return success(Deleted)
|
||||
|
||||
trace "Attempting to delete a block"
|
||||
|
||||
without res =? await self.tryDeleteBlock(cid, self.clock.now()), err:
|
||||
@ -205,12 +205,28 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||
|
||||
if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
|
||||
return failure(err)
|
||||
elif res.kind == InUse:
|
||||
trace "Block in use, refCount > 0 and not expired"
|
||||
else:
|
||||
trace "Block not found in store"
|
||||
|
||||
return success()
|
||||
success(res.kind)
|
||||
|
||||
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||
## Delete a block from the blockstore when block refCount is 0 or block is expired
|
||||
##
|
||||
|
||||
logScope:
|
||||
cid = cid
|
||||
|
||||
without outcome =? await self.delBlockInternal(cid), err:
|
||||
return failure(err)
|
||||
|
||||
case outcome
|
||||
of InUse:
|
||||
failure("Directly deleting a block that is part of a dataset is not allowed.")
|
||||
of NotFound:
|
||||
trace "Block not found, ignoring"
|
||||
success()
|
||||
of Deleted:
|
||||
trace "Block already deleted"
|
||||
success()
|
||||
|
||||
method delBlock*(
|
||||
self: RepoStore, treeCid: Cid, index: Natural
|
||||
@ -221,12 +237,19 @@ method delBlock*(
|
||||
else:
|
||||
return failure(err)
|
||||
|
||||
if err =? (await self.delLeafMetadata(treeCid, index)).errorOption:
|
||||
error "Failed to delete leaf metadata, block will remain on disk.", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if err =?
|
||||
(await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption:
|
||||
if not (err of BlockNotFoundError):
|
||||
return failure(err)
|
||||
|
||||
await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0
|
||||
without _ =? await self.delBlockInternal(leafMd.blkCid), err:
|
||||
return failure(err)
|
||||
|
||||
success()
|
||||
|
||||
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
## Check if the block exists in the blockstore
|
||||
@ -295,6 +318,18 @@ proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
|
||||
let queryKey = ?createBlockExpirationMetadataQueryKey()
|
||||
success Query.init(queryKey, offset = offset, limit = maxNumber)
|
||||
|
||||
proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} =
|
||||
## Returns the reference count for a block. If the count is zero;
|
||||
## this means the block is eligible for garbage collection.
|
||||
##
|
||||
without key =? createBlockExpirationMetadataKey(cid), err:
|
||||
return failure(err)
|
||||
|
||||
without md =? await get[BlockMetadata](self.metaDs, key), err:
|
||||
return failure(err)
|
||||
|
||||
return success(md.refCount)
|
||||
|
||||
method getBlockExpirations*(
|
||||
self: RepoStore, maxNumber: int, offset: int
|
||||
): Future[?!AsyncIter[BlockExpiration]] {.async, base.} =
|
||||
|
||||
@ -110,7 +110,7 @@ method readOnce*(
|
||||
raise newLPStreamReadError(error)
|
||||
|
||||
trace "Reading bytes from store stream",
|
||||
manifestCid = self.manifest.cid.get(),
|
||||
manifestCid = self.manifest.treeCid,
|
||||
numBlocks = self.manifest.blocksCount,
|
||||
blockNum,
|
||||
blkCid = blk.cid,
|
||||
|
||||
25
codex/utils/arrayutils.nim
Normal file
25
codex/utils/arrayutils.nim
Normal file
@ -0,0 +1,25 @@
|
||||
import std/sequtils
|
||||
|
||||
proc createDoubleArray*(
|
||||
outerLen, innerLen: int
|
||||
): ptr UncheckedArray[ptr UncheckedArray[byte]] =
|
||||
# Allocate outer array
|
||||
result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](allocShared0(
|
||||
sizeof(ptr UncheckedArray[byte]) * outerLen
|
||||
))
|
||||
|
||||
# Allocate each inner array
|
||||
for i in 0 ..< outerLen:
|
||||
result[i] = cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * innerLen))
|
||||
|
||||
proc freeDoubleArray*(
|
||||
arr: ptr UncheckedArray[ptr UncheckedArray[byte]], outerLen: int
|
||||
) =
|
||||
# Free each inner array
|
||||
for i in 0 ..< outerLen:
|
||||
if not arr[i].isNil:
|
||||
deallocShared(arr[i])
|
||||
|
||||
# Free outer array
|
||||
if not arr.isNil:
|
||||
deallocShared(arr)
|
||||
@ -9,6 +9,34 @@ if [[ -n "${ENV_PATH}" ]]; then
|
||||
set +a
|
||||
fi
|
||||
|
||||
# Bootstrap node from URL
|
||||
if [[ -n "${BOOTSTRAP_NODE_URL}" ]]; then
|
||||
BOOTSTRAP_NODE_URL="${BOOTSTRAP_NODE_URL}/api/codex/v1/spr"
|
||||
WAIT=${BOOTSTRAP_NODE_URL_WAIT:-300}
|
||||
SECONDS=0
|
||||
SLEEP=1
|
||||
# Run and retry if fail
|
||||
while (( SECONDS < WAIT )); do
|
||||
SPR=$(curl -s -f -m 5 -H 'Accept: text/plain' "${BOOTSTRAP_NODE_URL}")
|
||||
# Check if exit code is 0 and returned value is not empty
|
||||
if [[ $? -eq 0 && -n "${SPR}" ]]; then
|
||||
export CODEX_BOOTSTRAP_NODE="${SPR}"
|
||||
echo "Bootstrap node: CODEX_BOOTSTRAP_NODE=${CODEX_BOOTSTRAP_NODE}"
|
||||
break
|
||||
else
|
||||
# Sleep and check again
|
||||
echo "Can't get SPR from ${BOOTSTRAP_NODE_URL} - Retry in $SLEEP seconds / $((WAIT - SECONDS))"
|
||||
sleep $SLEEP
|
||||
fi
|
||||
done
|
||||
fi
|
||||
|
||||
# Stop Codex run if unable to get SPR
|
||||
if [[ -n "${BOOTSTRAP_NODE_URL}" && -z "${CODEX_BOOTSTRAP_NODE}" ]]; then
|
||||
echo "Unable to get SPR from ${BOOTSTRAP_NODE_URL} in ${BOOTSTRAP_NODE_URL_WAIT} seconds - Stop Codex run"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Parameters
|
||||
if [[ -z "${CODEX_NAT}" ]]; then
|
||||
if [[ "${NAT_IP_AUTO}" == "true" && -z "${NAT_PUBLIC_IP_AUTO}" ]]; then
|
||||
|
||||
@ -379,12 +379,6 @@ components:
|
||||
nullable: true
|
||||
description: "The original mimetype of the uploaded content (optional)"
|
||||
example: image/png
|
||||
uploadedAt:
|
||||
type: integer
|
||||
format: int64
|
||||
nullable: true
|
||||
description: "The UTC upload timestamp in seconds"
|
||||
example: 1729244192
|
||||
|
||||
Space:
|
||||
type: object
|
||||
|
||||
@ -96,9 +96,9 @@ asyncchecksuite "Block Advertising and Discovery":
|
||||
|
||||
await engine.stop()
|
||||
|
||||
test "Should advertise both manifests and trees":
|
||||
test "Should advertise trees":
|
||||
let
|
||||
cids = @[manifest.cid.tryGet, manifest.treeCid]
|
||||
cids = @[manifest.treeCid]
|
||||
advertised = initTable.collect:
|
||||
for cid in cids:
|
||||
{cid: newFuture[void]()}
|
||||
|
||||
@ -8,6 +8,7 @@ import pkg/codex/stores
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/codex/sales
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/manifest
|
||||
import ../examples
|
||||
|
||||
export examples
|
||||
@ -36,8 +37,8 @@ proc example*(_: type SignedState): SignedState =
|
||||
proc example*(_: type Pricing): Pricing =
|
||||
Pricing(address: EthAddress.example, price: uint32.rand.u256)
|
||||
|
||||
proc example*(_: type bt.Block): bt.Block =
|
||||
let length = rand(4096)
|
||||
proc example*(_: type bt.Block, size: int = 4096): bt.Block =
|
||||
let length = rand(size)
|
||||
let bytes = newSeqWith(length, rand(uint8))
|
||||
bt.Block.new(bytes).tryGet()
|
||||
|
||||
@ -51,6 +52,15 @@ proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx =
|
||||
proc example*(_: type Cid): Cid =
|
||||
bt.Block.example.cid
|
||||
|
||||
proc example*(_: type Manifest): Manifest =
|
||||
Manifest.new(
|
||||
treeCid = Cid.example,
|
||||
blockSize = 256.NBytes,
|
||||
datasetSize = 4096.NBytes,
|
||||
filename = "example.txt".some,
|
||||
mimetype = "text/plain".some,
|
||||
)
|
||||
|
||||
proc example*(_: type MultiHash, mcodec = Sha256HashCodec): MultiHash =
|
||||
let bytes = newSeqWith(256, rand(uint8))
|
||||
MultiHash.digest($mcodec, bytes).tryGet()
|
||||
|
||||
@ -85,30 +85,31 @@ proc makeWantList*(
|
||||
)
|
||||
|
||||
proc storeDataGetManifest*(
|
||||
store: BlockStore, chunker: Chunker
|
||||
store: BlockStore, blocks: seq[Block]
|
||||
): Future[Manifest] {.async.} =
|
||||
var cids = newSeq[Cid]()
|
||||
|
||||
while (let chunk = await chunker.getBytes(); chunk.len > 0):
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
cids.add(blk.cid)
|
||||
for blk in blocks:
|
||||
(await store.putBlock(blk)).tryGet()
|
||||
|
||||
let
|
||||
tree = CodexTree.init(cids).tryGet()
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(chunker.chunkSize),
|
||||
datasetSize = NBytes(chunker.offset),
|
||||
)
|
||||
|
||||
for i in 0 ..< tree.leavesCount:
|
||||
let proof = tree.getProof(i).tryGet()
|
||||
(await store.putCidAndProof(treeCid, i, cids[i], proof)).tryGet()
|
||||
(await store.putCidAndProof(treeCid, i, blocks[i].cid, proof)).tryGet()
|
||||
|
||||
return manifest
|
||||
|
||||
proc storeDataGetManifest*(
|
||||
store: BlockStore, chunker: Chunker
|
||||
): Future[Manifest] {.async.} =
|
||||
var blocks = newSeq[Block]()
|
||||
|
||||
while (let chunk = await chunker.getBytes(); chunk.len > 0):
|
||||
blocks.add(Block.new(chunk).tryGet())
|
||||
|
||||
return await storeDataGetManifest(store, blocks)
|
||||
|
||||
proc makeRandomBlocks*(
|
||||
datasetSize: int, blockSize: NBytes
|
||||
): Future[seq[Block]] {.async.} =
|
||||
|
||||
@ -6,6 +6,7 @@ import pkg/chronos
|
||||
import pkg/codex/codextypes
|
||||
import pkg/codex/chunker
|
||||
import pkg/codex/stores
|
||||
import pkg/taskpools
|
||||
|
||||
import ../../asynctest
|
||||
|
||||
@ -118,6 +119,7 @@ template setupAndTearDown*() {.dirty.} =
|
||||
engine = engine,
|
||||
prover = Prover.none,
|
||||
discovery = blockDiscovery,
|
||||
taskpool = Taskpool.new(),
|
||||
)
|
||||
|
||||
teardown:
|
||||
|
||||
@ -75,10 +75,9 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
let
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
|
||||
|
||||
manifestCid = manifestBlock.cid
|
||||
manifestCidStr = $(manifestCid)
|
||||
|
||||
(await localStore.putBlock(manifestBlock)).tryGet()
|
||||
|
||||
@ -99,7 +98,7 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123
|
||||
expiryUpdateCallback = !sales.onExpiryUpdate
|
||||
|
||||
(await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet()
|
||||
(await expiryUpdateCallback(manifestCid, expectedExpiry)).tryGet()
|
||||
|
||||
for index in 0 ..< manifest.blocksCount:
|
||||
let
|
||||
@ -116,7 +115,7 @@ asyncchecksuite "Test Node - Host contracts":
|
||||
test "onStore callback":
|
||||
let onStore = !sales.onStore
|
||||
var request = StorageRequest.example
|
||||
request.content.cid = $verifiableBlock.cid
|
||||
request.content.cid = verifiableBlock.cid
|
||||
request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256
|
||||
var fetchedBytes: uint = 0
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@ import pkg/questionable/results
|
||||
import pkg/stint
|
||||
import pkg/poseidon2
|
||||
import pkg/poseidon2/io
|
||||
import pkg/taskpools
|
||||
|
||||
import pkg/nitro
|
||||
import pkg/codexdht/discv5/protocol as discv5
|
||||
@ -37,6 +38,7 @@ import ../examples
|
||||
import ../helpers
|
||||
import ../helpers/mockmarket
|
||||
import ../helpers/mockclock
|
||||
import ../slots/helpers
|
||||
|
||||
import ./helpers
|
||||
|
||||
@ -66,7 +68,7 @@ asyncchecksuite "Test Node - Basic":
|
||||
# https://github.com/codex-storage/nim-codex/issues/699
|
||||
let
|
||||
cstore = CountingStore.new(engine, localStore)
|
||||
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery)
|
||||
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new())
|
||||
missingCid =
|
||||
Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get()
|
||||
|
||||
@ -137,7 +139,8 @@ asyncchecksuite "Test Node - Basic":
|
||||
|
||||
test "Setup purchase request":
|
||||
let
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
erasure =
|
||||
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
|
||||
manifest = await storeDataGetManifest(localStore, chunker)
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
@ -164,5 +167,30 @@ asyncchecksuite "Test Node - Basic":
|
||||
|
||||
check:
|
||||
(await verifiableBlock.cid in localStore) == true
|
||||
request.content.cid == $verifiableBlock.cid
|
||||
request.content.cid == verifiableBlock.cid
|
||||
request.content.merkleRoot == builder.verifyRoot.get.toBytes
|
||||
|
||||
test "Should delete a single block":
|
||||
let randomBlock = bt.Block.new("Random block".toBytes).tryGet()
|
||||
(await localStore.putBlock(randomBlock)).tryGet()
|
||||
check (await randomBlock.cid in localStore) == true
|
||||
|
||||
(await node.delete(randomBlock.cid)).tryGet()
|
||||
check (await randomBlock.cid in localStore) == false
|
||||
|
||||
test "Should delete an entire dataset":
|
||||
let
|
||||
blocks = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
|
||||
manifest = await storeDataGetManifest(localStore, blocks)
|
||||
manifestBlock = (await store.storeManifest(manifest)).tryGet()
|
||||
manifestCid = manifestBlock.cid
|
||||
|
||||
check await manifestCid in localStore
|
||||
for blk in blocks:
|
||||
check await blk.cid in localStore
|
||||
|
||||
(await node.delete(manifestCid)).tryGet()
|
||||
|
||||
check not await manifestCid in localStore
|
||||
for blk in blocks:
|
||||
check not (await blk.cid in localStore)
|
||||
|
||||
@ -36,7 +36,7 @@ checksuite "sales state 'filled'":
|
||||
market.requestEnds[request.id] = 321
|
||||
onExpiryUpdatePassedExpiry = -1
|
||||
let onExpiryUpdate = proc(
|
||||
rootCid: string, expiry: SecondsSince1970
|
||||
rootCid: Cid, expiry: SecondsSince1970
|
||||
): Future[?!void] {.async.} =
|
||||
onExpiryUpdatePassedExpiry = expiry
|
||||
return success()
|
||||
|
||||
@ -47,7 +47,9 @@ asyncchecksuite "Sales - start":
|
||||
pricePerBytePerSecond: 1.u256,
|
||||
collateralPerByte: 1.u256,
|
||||
),
|
||||
content: StorageContent(cid: "some cid"),
|
||||
content: StorageContent(
|
||||
cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet
|
||||
),
|
||||
expiry: (getTime() + initDuration(hours = 1)).toUnix.u256,
|
||||
)
|
||||
|
||||
@ -65,7 +67,7 @@ asyncchecksuite "Sales - start":
|
||||
return success()
|
||||
|
||||
sales.onExpiryUpdate = proc(
|
||||
rootCid: string, expiry: SecondsSince1970
|
||||
rootCid: Cid, expiry: SecondsSince1970
|
||||
): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
@ -161,7 +163,9 @@ asyncchecksuite "Sales":
|
||||
pricePerBytePerSecond: minPricePerBytePerSecond,
|
||||
collateralPerByte: 1.u256,
|
||||
),
|
||||
content: StorageContent(cid: "some cid"),
|
||||
content: StorageContent(
|
||||
cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet
|
||||
),
|
||||
expiry: (getTime() + initDuration(hours = 1)).toUnix.u256,
|
||||
)
|
||||
|
||||
@ -184,7 +188,7 @@ asyncchecksuite "Sales":
|
||||
return success()
|
||||
|
||||
sales.onExpiryUpdate = proc(
|
||||
rootCid: string, expiry: SecondsSince1970
|
||||
rootCid: Cid, expiry: SecondsSince1970
|
||||
): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
|
||||
@ -15,9 +15,7 @@ import pkg/codex/rng
|
||||
|
||||
import ../helpers
|
||||
|
||||
proc storeManifest*(
|
||||
store: BlockStore, manifest: Manifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
proc makeManifestBlock*(manifest: Manifest): ?!bt.Block =
|
||||
without encodedVerifiable =? manifest.encode(), err:
|
||||
trace "Unable to encode manifest"
|
||||
return failure(err)
|
||||
@ -26,6 +24,15 @@ proc storeManifest*(
|
||||
trace "Unable to create block from manifest"
|
||||
return failure(error)
|
||||
|
||||
success blk
|
||||
|
||||
proc storeManifest*(
|
||||
store: BlockStore, manifest: Manifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
without blk =? makeManifestBlock(manifest), err:
|
||||
trace "Unable to create manifest block", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if err =? (await store.putBlock(blk)).errorOption:
|
||||
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
@ -12,9 +12,11 @@ import pkg/datastore
|
||||
import pkg/codex/stores/cachestore
|
||||
import pkg/codex/chunker
|
||||
import pkg/codex/stores
|
||||
import pkg/codex/stores/repostore/operations
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/codex/clock
|
||||
import pkg/codex/utils/asynciter
|
||||
import pkg/codex/merkletree/codex
|
||||
|
||||
import ../../asynctest
|
||||
import ../helpers
|
||||
@ -354,6 +356,119 @@ asyncchecksuite "RepoStore":
|
||||
check has.isOk
|
||||
check has.get
|
||||
|
||||
test "should set the reference count for orphan blocks to 0":
|
||||
let blk = Block.example(size = 200)
|
||||
(await repo.putBlock(blk)).tryGet()
|
||||
check (await repo.blockRefCount(blk.cid)).tryGet() == 0.Natural
|
||||
|
||||
test "should not allow non-orphan blocks to be deleted directly":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(0).tryGet()
|
||||
|
||||
(await repo.putBlock(blk)).tryGet()
|
||||
(await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet()
|
||||
|
||||
let err = (await repo.delBlock(blk.cid)).error()
|
||||
check err.msg ==
|
||||
"Directly deleting a block that is part of a dataset is not allowed."
|
||||
|
||||
test "should allow non-orphan blocks to be deleted by dataset reference":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(0).tryGet()
|
||||
|
||||
(await repo.putBlock(blk)).tryGet()
|
||||
(await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet()
|
||||
|
||||
(await repo.delBlock(treeCid, 0.Natural)).tryGet()
|
||||
check not (await blk.cid in repo)
|
||||
|
||||
test "should not delete a non-orphan block until it is deleted from all parent datasets":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
blockPool = await makeRandomBlocks(datasetSize = 768, blockSize = 256'nb)
|
||||
|
||||
let
|
||||
dataset1 = @[blockPool[0], blockPool[1]]
|
||||
dataset2 = @[blockPool[1], blockPool[2]]
|
||||
|
||||
let sharedBlock = blockPool[1]
|
||||
|
||||
let
|
||||
(manifest1, tree1) = makeManifestAndTree(dataset1).tryGet()
|
||||
treeCid1 = tree1.rootCid.tryGet()
|
||||
(manifest2, tree2) = makeManifestAndTree(dataset2).tryGet()
|
||||
treeCid2 = tree2.rootCid.tryGet()
|
||||
|
||||
(await repo.putBlock(sharedBlock)).tryGet()
|
||||
check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 0.Natural
|
||||
|
||||
let
|
||||
proof1 = tree1.getProof(1).tryGet()
|
||||
proof2 = tree2.getProof(0).tryGet()
|
||||
|
||||
(await repo.putCidAndProof(treeCid1, 1, sharedBlock.cid, proof1)).tryGet()
|
||||
check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural
|
||||
|
||||
(await repo.putCidAndProof(treeCid2, 0, sharedBlock.cid, proof2)).tryGet()
|
||||
check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 2.Natural
|
||||
|
||||
(await repo.delBlock(treeCid1, 1.Natural)).tryGet()
|
||||
check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural
|
||||
check (await sharedBlock.cid in repo)
|
||||
|
||||
(await repo.delBlock(treeCid2, 0.Natural)).tryGet()
|
||||
check not (await sharedBlock.cid in repo)
|
||||
|
||||
test "should clear leaf metadata when block is deleted from dataset":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(1).tryGet()
|
||||
|
||||
(await repo.putBlock(blk)).tryGet()
|
||||
(await repo.putCidAndProof(treeCid, 0.Natural, blk.cid, proof)).tryGet()
|
||||
|
||||
discard (await repo.getLeafMetadata(treeCid, 0.Natural)).tryGet()
|
||||
|
||||
(await repo.delBlock(treeCid, 0.Natural)).tryGet()
|
||||
|
||||
let err = (await repo.getLeafMetadata(treeCid, 0.Natural)).error()
|
||||
check err of BlockNotFoundError
|
||||
|
||||
test "should not fail when reinserting and deleting a previously deleted block (bug #1108)":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(1).tryGet()
|
||||
|
||||
(await repo.putBlock(blk)).tryGet()
|
||||
(await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet()
|
||||
|
||||
(await repo.delBlock(treeCid, 0.Natural)).tryGet()
|
||||
(await repo.putBlock(blk)).tryGet()
|
||||
(await repo.delBlock(treeCid, 0.Natural)).tryGet()
|
||||
|
||||
commonBlockStoreTests(
|
||||
"RepoStore Sql backend",
|
||||
proc(): BlockStore =
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
import std/times
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
@ -11,6 +12,8 @@ import pkg/codex/blocktype as bt
|
||||
import pkg/codex/rng
|
||||
import pkg/codex/utils
|
||||
import pkg/codex/indexingstrategy
|
||||
import pkg/taskpools
|
||||
import pkg/codex/utils/arrayutils
|
||||
|
||||
import ../asynctest
|
||||
import ./helpers
|
||||
@ -27,6 +30,7 @@ suite "Erasure encode/decode":
|
||||
var erasure: Erasure
|
||||
let repoTmp = TempLevelDb.new()
|
||||
let metaTmp = TempLevelDb.new()
|
||||
var taskpool: Taskpool
|
||||
|
||||
setup:
|
||||
let
|
||||
@ -35,12 +39,14 @@ suite "Erasure encode/decode":
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
taskpool = Taskpool.new()
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
|
||||
manifest = await storeDataGetManifest(store, chunker)
|
||||
|
||||
teardown:
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
taskpool.shutdown()
|
||||
|
||||
proc encode(buffers, parity: int): Future[Manifest] {.async.} =
|
||||
let encoded =
|
||||
@ -212,7 +218,7 @@ suite "Erasure encode/decode":
|
||||
let present = await store.hasBlock(manifest.treeCid, d)
|
||||
check present.tryGet()
|
||||
|
||||
test "handles edge case of 0 parity blocks":
|
||||
test "Handles edge case of 0 parity blocks":
|
||||
const
|
||||
buffers = 20
|
||||
parity = 0
|
||||
@ -221,6 +227,43 @@ suite "Erasure encode/decode":
|
||||
|
||||
discard (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
test "Should concurrently encode/decode multiple datasets":
|
||||
const iterations = 2
|
||||
|
||||
let
|
||||
datasetSize = 1.MiBs
|
||||
ecK = 10.Natural
|
||||
ecM = 10.Natural
|
||||
|
||||
var encodeTasks = newSeq[Future[?!Manifest]]()
|
||||
var decodeTasks = newSeq[Future[?!Manifest]]()
|
||||
var manifests = newSeq[Manifest]()
|
||||
for i in 0 ..< iterations:
|
||||
let
|
||||
# create random data and store it
|
||||
blockSize = rng.sample(@[1, 2, 4, 8, 16, 32, 64].mapIt(it.KiBs))
|
||||
chunker = RandomChunker.new(rng, size = datasetSize, chunkSize = blockSize)
|
||||
manifest = await storeDataGetManifest(store, chunker)
|
||||
manifests.add(manifest)
|
||||
# encode the data concurrently
|
||||
encodeTasks.add(erasure.encode(manifest, ecK, ecM))
|
||||
# wait for all encoding tasks to finish
|
||||
let encodeResults = await allFinished(encodeTasks)
|
||||
# decode the data concurrently
|
||||
for i in 0 ..< encodeResults.len:
|
||||
decodeTasks.add(erasure.decode(encodeResults[i].read().tryGet()))
|
||||
# wait for all decoding tasks to finish
|
||||
let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures
|
||||
|
||||
for j in 0 ..< decodeTasks.len:
|
||||
let
|
||||
decoded = decodeResults[j].read().tryGet()
|
||||
encoded = encodeResults[j].read().tryGet()
|
||||
check:
|
||||
decoded.treeCid == manifests[j].treeCid
|
||||
decoded.treeCid == encoded.originalTreeCid
|
||||
decoded.blocksCount == encoded.originalBlocksCount
|
||||
|
||||
test "Should handle verifiable manifests":
|
||||
const
|
||||
buffers = 20
|
||||
@ -259,3 +302,73 @@ suite "Erasure encode/decode":
|
||||
decoded.treeCid == manifest.treeCid
|
||||
decoded.treeCid == encoded.originalTreeCid
|
||||
decoded.blocksCount == encoded.originalBlocksCount
|
||||
|
||||
test "Should complete encode/decode task when cancelled":
|
||||
let
|
||||
blocksLen = 10000
|
||||
parityLen = 10
|
||||
data = seq[seq[byte]].new()
|
||||
chunker = RandomChunker.new(
|
||||
rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize
|
||||
)
|
||||
|
||||
data[].setLen(blocksLen)
|
||||
|
||||
for i in 0 ..< blocksLen:
|
||||
let chunk = await chunker.getBytes()
|
||||
shallowCopy(data[i], @(chunk))
|
||||
|
||||
let
|
||||
parity = createDoubleArray(parityLen, BlockSize.int)
|
||||
paritySeq = seq[seq[byte]].new()
|
||||
recovered = createDoubleArray(blocksLen, BlockSize.int)
|
||||
cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int)
|
||||
cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int)
|
||||
|
||||
paritySeq[].setLen(parityLen)
|
||||
defer:
|
||||
freeDoubleArray(parity, parityLen)
|
||||
freeDoubleArray(cancelledTaskParity, parityLen)
|
||||
freeDoubleArray(recovered, blocksLen)
|
||||
freeDoubleArray(cancelledTaskRecovered, blocksLen)
|
||||
|
||||
for i in 0 ..< parityLen:
|
||||
paritySeq[i] = cast[seq[byte]](parity[i])
|
||||
|
||||
# call encodeAsync to get the parity
|
||||
let encFut =
|
||||
await erasure.encodeAsync(BlockSize.int, blocksLen, parityLen, data, parity)
|
||||
check encFut.isOk
|
||||
|
||||
let decFut = await erasure.decodeAsync(
|
||||
BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered
|
||||
)
|
||||
check decFut.isOk
|
||||
|
||||
# call encodeAsync and cancel the task
|
||||
let encodeFut = erasure.encodeAsync(
|
||||
BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity
|
||||
)
|
||||
encodeFut.cancel()
|
||||
|
||||
try:
|
||||
discard await encodeFut
|
||||
except CatchableError as exc:
|
||||
check exc of CancelledError
|
||||
finally:
|
||||
for i in 0 ..< parityLen:
|
||||
check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int)
|
||||
|
||||
# call decodeAsync and cancel the task
|
||||
let decodeFut = erasure.decodeAsync(
|
||||
BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered
|
||||
)
|
||||
decodeFut.cancel()
|
||||
|
||||
try:
|
||||
discard await decodeFut
|
||||
except CatchableError as exc:
|
||||
check exc of CancelledError
|
||||
finally:
|
||||
for i in 0 ..< blocksLen:
|
||||
check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int)
|
||||
|
||||
@ -13,7 +13,7 @@ type MockProvider* = ref object of Provider
|
||||
|
||||
method getBlock*(
|
||||
provider: MockProvider, tag: BlockTag
|
||||
): Future[?Block] {.async: (raises: [ProviderError]).} =
|
||||
): Future[?Block] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
try:
|
||||
if tag == BlockTag.latest:
|
||||
if latestBlock =? provider.latest:
|
||||
|
||||
@ -12,7 +12,7 @@ type MockProvider = ref object of Provider
|
||||
|
||||
method getChainId*(
|
||||
provider: MockProvider
|
||||
): Future[UInt256] {.async: (raises: [ProviderError]).} =
|
||||
): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} =
|
||||
return provider.chainId
|
||||
|
||||
proc configFactory(): CodexConf =
|
||||
|
||||
@ -3,6 +3,8 @@ import std/importutils
|
||||
import pkg/chronos
|
||||
import pkg/ethers/erc20
|
||||
import codex/contracts
|
||||
import pkg/libp2p/cid
|
||||
import pkg/lrucache
|
||||
import ../ethertest
|
||||
import ./examples
|
||||
import ./time
|
||||
@ -591,3 +593,13 @@ ethersuite "On-Chain Market":
|
||||
let expectedPayout = request.expectedPayout(filledAt, requestEnd.u256)
|
||||
check endBalanceHost == (startBalanceHost + request.ask.collateralPerSlot)
|
||||
check endBalanceReward == (startBalanceReward + expectedPayout)
|
||||
|
||||
test "the request is added in cache after the fist access":
|
||||
await market.requestStorage(request)
|
||||
|
||||
check market.requestCache.contains($request.id) == false
|
||||
discard await market.getRequest(request.id)
|
||||
|
||||
check market.requestCache.contains($request.id) == true
|
||||
let cacheValue = market.requestCache[$request.id]
|
||||
check cacheValue == request
|
||||
|
||||
@ -57,7 +57,7 @@ proc example*(_: type StorageRequest): StorageRequest =
|
||||
maxSlotLoss: 2, # 2 slots can be freed without data considered to be lost
|
||||
),
|
||||
content: StorageContent(
|
||||
cid: "zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob",
|
||||
cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet,
|
||||
merkleRoot: array[32, byte].example,
|
||||
),
|
||||
expiry: (60 * 60).u256, # 1 hour ,
|
||||
@ -86,8 +86,7 @@ proc example(_: type G2Point): G2Point =
|
||||
proc example*(_: type Groth16Proof): Groth16Proof =
|
||||
Groth16Proof(a: G1Point.example, b: G2Point.example, c: G1Point.example)
|
||||
|
||||
proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} =
|
||||
# doAssert blocks >= 3, "must be more than 3 blocks"
|
||||
proc example*(_: type RandomChunker, blocks: int): Future[seq[byte]] {.async.} =
|
||||
let rng = Rng.instance()
|
||||
let chunker = RandomChunker.new(
|
||||
rng, size = DefaultBlockSize * blocks.NBytes, chunkSize = DefaultBlockSize
|
||||
@ -95,7 +94,7 @@ proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} =
|
||||
var data: seq[byte]
|
||||
while (let moar = await chunker.getBytes(); moar != []):
|
||||
data.add moar
|
||||
return byteutils.toHex(data)
|
||||
return data
|
||||
|
||||
proc example*(_: type RandomChunker): Future[string] {.async.} =
|
||||
await RandomChunker.example(3)
|
||||
|
||||
@ -44,6 +44,9 @@ proc upload*(client: CodexClient, contents: string): ?!Cid =
|
||||
assert response.status == "200 OK"
|
||||
Cid.init(response.body).mapFailure
|
||||
|
||||
proc upload*(client: CodexClient, bytes: seq[byte]): ?!Cid =
|
||||
client.upload(string.fromBytes(bytes))
|
||||
|
||||
proc download*(client: CodexClient, cid: Cid, local = false): ?!string =
|
||||
let response = client.http.get(
|
||||
client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream")
|
||||
@ -83,6 +86,16 @@ proc downloadBytes*(
|
||||
|
||||
success bytes
|
||||
|
||||
proc delete*(client: CodexClient, cid: Cid): ?!void =
|
||||
let
|
||||
url = client.baseurl & "/data/" & $cid
|
||||
response = client.http.delete(url)
|
||||
|
||||
if response.status != "204 No Content":
|
||||
return failure(response.status)
|
||||
|
||||
success()
|
||||
|
||||
proc list*(client: CodexClient): ?!RestContentList =
|
||||
let url = client.baseurl & "/data"
|
||||
let response = client.http.get(url)
|
||||
@ -312,3 +325,6 @@ proc downloadRaw*(client: CodexClient, cid: string, local = false): Response =
|
||||
client.baseurl & "/data/" & cid & (if local: "" else: "/network/stream"),
|
||||
httpMethod = HttpGet,
|
||||
)
|
||||
|
||||
proc deleteRaw*(client: CodexClient, cid: string): Response =
|
||||
return client.http.request(client.baseurl & "/data/" & cid, httpMethod = HttpDelete)
|
||||
|
||||
@ -4,6 +4,7 @@ from pkg/libp2p import Cid
|
||||
import pkg/codex/contracts/marketplace as mp
|
||||
import pkg/codex/periods
|
||||
import pkg/codex/utils/json
|
||||
from pkg/codex/utils import roundUp, divUp
|
||||
import ./multinodes
|
||||
import ../contracts/time
|
||||
import ../contracts/deployment
|
||||
@ -45,11 +46,14 @@ template marketplacesuite*(name: string, body: untyped) =
|
||||
proc periods(p: int): uint64 =
|
||||
p.uint64 * period
|
||||
|
||||
proc slotSize(blocks: int): UInt256 =
|
||||
(DefaultBlockSize * blocks.NBytes).Natural.u256
|
||||
proc slotSize(blocks, nodes, tolerance: int): UInt256 =
|
||||
let ecK = nodes - tolerance
|
||||
let blocksRounded = roundUp(blocks, ecK)
|
||||
let blocksPerSlot = divUp(blocksRounded, ecK)
|
||||
(DefaultBlockSize * blocksPerSlot.NBytes).Natural.u256
|
||||
|
||||
proc datasetSize(blocks, nodes, tolerance: int): UInt256 =
|
||||
(nodes + tolerance).u256 * slotSize(blocks)
|
||||
return nodes.u256 * slotSize(blocks, nodes, tolerance)
|
||||
|
||||
proc createAvailabilities(
|
||||
datasetSize: UInt256,
|
||||
|
||||
@ -50,7 +50,7 @@ marketplacesuite "Bug #821 - node crashes during erasure coding":
|
||||
check eventually(requestId.isSome, timeout = expiry.int * 1000)
|
||||
|
||||
let request = await marketplace.getRequest(requestId.get)
|
||||
let cidFromRequest = Cid.init(request.content.cid).get()
|
||||
let cidFromRequest = request.content.cid
|
||||
let downloaded = await clientApi.downloadBytes(cidFromRequest, local = true)
|
||||
check downloaded.isOk
|
||||
check downloaded.get.toHex == data.toHex
|
||||
|
||||
@ -114,7 +114,7 @@ marketplacesuite "Marketplace":
|
||||
await ethProvider.advanceTime(duration)
|
||||
|
||||
# Checking that the hosting node received reward for at least the time between <expiry;end>
|
||||
let slotSize = slotSize(blocks)
|
||||
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
|
||||
let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize
|
||||
check eventually (await token.balanceOf(hostAccount)) - startBalanceHost >=
|
||||
(duration - 5 * 60) * pricePerSlotPerSecond * ecNodes.u256
|
||||
@ -246,7 +246,7 @@ marketplacesuite "Marketplace payouts":
|
||||
|
||||
await advanceToNextPeriod()
|
||||
|
||||
let slotSize = slotSize(blocks)
|
||||
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
|
||||
let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize
|
||||
|
||||
check eventually (
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
from std/times import inMilliseconds
|
||||
import pkg/questionable
|
||||
import pkg/codex/logutils
|
||||
import pkg/stew/byteutils
|
||||
import ../contracts/time
|
||||
import ../contracts/deployment
|
||||
import ../codex/helpers
|
||||
@ -60,8 +59,7 @@ marketplacesuite "Hosts submit regular proofs":
|
||||
let purchase = client0.getPurchase(purchaseId).get
|
||||
check purchase.error == none string
|
||||
|
||||
let request = purchase.request.get
|
||||
let slotSize = request.ask.slotSize
|
||||
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
|
||||
|
||||
check eventually(
|
||||
client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
|
||||
|
||||
@ -47,6 +47,8 @@ twonodessuite "Purchasing":
|
||||
).get
|
||||
|
||||
let request = client1.getPurchase(id).get.request.get
|
||||
|
||||
check request.content.cid.data.buffer.len > 0
|
||||
check request.ask.duration == 100.u256
|
||||
check request.ask.pricePerBytePerSecond == 1.u256
|
||||
check request.ask.proofProbability == 3.u256
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
import std/httpclient
|
||||
import std/sequtils
|
||||
from pkg/libp2p import `==`
|
||||
import std/strformat
|
||||
from pkg/libp2p import `==`, `$`, Cid
|
||||
import pkg/codex/units
|
||||
import pkg/codex/manifest
|
||||
import ./twonodes
|
||||
import ../examples
|
||||
import ../codex/examples
|
||||
import ../codex/slots/helpers
|
||||
import json
|
||||
|
||||
twonodessuite "REST API":
|
||||
@ -35,7 +39,7 @@ twonodessuite "REST API":
|
||||
check:
|
||||
space.totalBlocks == 2
|
||||
space.quotaMaxBytes == 8589934592.NBytes
|
||||
space.quotaUsedBytes == 65598.NBytes
|
||||
space.quotaUsedBytes == 65592.NBytes
|
||||
space.quotaReservedBytes == 12.NBytes
|
||||
|
||||
test "node lists local files", twoNodesConfig:
|
||||
@ -145,18 +149,19 @@ twonodessuite "REST API":
|
||||
check responseBefore.body ==
|
||||
"Invalid parameters: `tolerance` cannot be greater than `nodes`"
|
||||
|
||||
test "request storage succeeds if nodes and tolerance within range", twoNodesConfig:
|
||||
let data = await RandomChunker.example(blocks = 2)
|
||||
let cid = client1.upload(data).get
|
||||
let duration = 100.u256
|
||||
let pricePerBytePerSecond = 1.u256
|
||||
let proofProbability = 3.u256
|
||||
let expiry = 30.uint
|
||||
let collateralPerByte = 1.u256
|
||||
let ecParams = @[(3, 1), (5, 2)]
|
||||
|
||||
for ecParam in ecParams:
|
||||
let (nodes, tolerance) = ecParam
|
||||
for ecParams in @[
|
||||
(minBlocks: 2, nodes: 3, tolerance: 1), (minBlocks: 3, nodes: 5, tolerance: 2)
|
||||
]:
|
||||
let (minBlocks, nodes, tolerance) = ecParams
|
||||
test "request storage succeeds if nodes and tolerance within range " &
|
||||
fmt"({minBlocks=}, {nodes=}, {tolerance=})", twoNodesConfig:
|
||||
let data = await RandomChunker.example(blocks = minBlocks)
|
||||
let cid = client1.upload(data).get
|
||||
let duration = 100.u256
|
||||
let pricePerBytePerSecond = 1.u256
|
||||
let proofProbability = 3.u256
|
||||
let expiry = 30.uint
|
||||
let collateralPerByte = 1.u256
|
||||
|
||||
var responseBefore = client1.requestStorageRaw(
|
||||
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
|
||||
@ -201,7 +206,7 @@ twonodessuite "REST API":
|
||||
let response = client1.uploadRaw("some file contents", headers)
|
||||
|
||||
check response.status == "422 Unprocessable Entity"
|
||||
check response.body == "The MIME type is not valid."
|
||||
check response.body == "The MIME type 'hello/world' is not valid."
|
||||
|
||||
test "node retrieve the metadata", twoNodesConfig:
|
||||
let headers = newHttpHeaders(
|
||||
@ -228,8 +233,6 @@ twonodessuite "REST API":
|
||||
check manifest["filename"].getStr() == "example.txt"
|
||||
check manifest.hasKey("mimetype") == true
|
||||
check manifest["mimetype"].getStr() == "text/plain"
|
||||
check manifest.hasKey("uploadedAt") == true
|
||||
check manifest["uploadedAt"].getInt() > 0
|
||||
|
||||
test "node set the headers when for download", twoNodesConfig:
|
||||
let headers = newHttpHeaders(
|
||||
@ -262,3 +265,24 @@ twonodessuite "REST API":
|
||||
check localResponse.headers.hasKey("Content-Disposition") == true
|
||||
check localResponse.headers["Content-Disposition"] ==
|
||||
"attachment; filename=\"example.txt\""
|
||||
|
||||
test "should delete a dataset when requested", twoNodesConfig:
|
||||
let cid = client1.upload("some file contents").get
|
||||
|
||||
var response = client1.downloadRaw($cid, local = true)
|
||||
check response.body == "some file contents"
|
||||
|
||||
client1.delete(cid).get
|
||||
|
||||
response = client1.downloadRaw($cid, local = true)
|
||||
check response.status == "404 Not Found"
|
||||
|
||||
test "should return 200 when attempting delete of non-existing block", twoNodesConfig:
|
||||
let response = client1.deleteRaw($(Cid.example()))
|
||||
check response.status == "204 No Content"
|
||||
|
||||
test "should return 200 when attempting delete of non-existing dataset",
|
||||
twoNodesConfig:
|
||||
let cid = Manifest.example().makeManifestBlock().get.cid
|
||||
let response = client1.deleteRaw($cid)
|
||||
check response.status == "204 No Content"
|
||||
|
||||
@ -88,7 +88,7 @@ twonodessuite "Uploads and downloads":
|
||||
let cid = a.upload(data).get
|
||||
let response = b.download(cid).get
|
||||
check:
|
||||
response == data
|
||||
@response.mapIt(it.byte) == data
|
||||
|
||||
for run in 0 .. 10:
|
||||
await transferTest(client1, client2)
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import std/os
|
||||
import std/macros
|
||||
import pkg/questionable
|
||||
import ./multinodes
|
||||
|
||||
2
vendor/codex-contracts-eth
vendored
2
vendor/codex-contracts-eth
vendored
@ -1 +1 @@
|
||||
Subproject commit e74d3397a133eaf1eb95d9ce59f56747a7c8c30b
|
||||
Subproject commit 0f2012b1442c404605c8ba9dcae2f4e53058cd2c
|
||||
2
vendor/nim-ethers
vendored
2
vendor/nim-ethers
vendored
@ -1 +1 @@
|
||||
Subproject commit 1cfccb9695fa47860bf7ef3d75da9019096a3933
|
||||
Subproject commit d2b11a865796a55296027f8ffba68398035ad435
|
||||
2
vendor/nim-leopard
vendored
2
vendor/nim-leopard
vendored
@ -1 +1 @@
|
||||
Subproject commit 3e09d8113f874f3584c3fe93818541b2ff9fb9c3
|
||||
Subproject commit 7506b90f9c650c02b96bf525d4fd1bd4942a495f
|
||||
2
vendor/nim-nat-traversal
vendored
2
vendor/nim-nat-traversal
vendored
@ -1 +1 @@
|
||||
Subproject commit 5e4059746e9095e1731b02eeaecd62a70fbe664d
|
||||
Subproject commit 6508ce75060878dfcdfa21f94721672c69a1823b
|
||||
Loading…
x
Reference in New Issue
Block a user