mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
Purging of local datasets (#1103)
* feat(codex-node): add dataset deletion API to Codex node * feat(api): add deletion of local datasets to API * fix: logging, remove garbage, drop some CORS headers from DELETE request * fix: change empty response return code to 204 instead of 200 * fix: add time-based idling to avoid locking up the node during deletes, fix API status code * fix: uncomment commented tests committed by accident * fix: return correct code when missing CID is a Manifest CID; add back CORS headers * fix: remove lingering echo
This commit is contained in:
parent
11888e78d7
commit
bbe1f09cd7
34
build.nims
34
build.nims
@ -4,7 +4,6 @@ import std/os except commandLineParams
|
||||
|
||||
### Helper functions
|
||||
proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
|
||||
|
||||
if not dirExists "build":
|
||||
mkDir "build"
|
||||
|
||||
@ -14,13 +13,15 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
|
||||
for param in commandLineParams():
|
||||
extra_params &= " " & param
|
||||
else:
|
||||
for i in 2..<paramCount():
|
||||
for i in 2 ..< paramCount():
|
||||
extra_params &= " " & paramStr(i)
|
||||
|
||||
let
|
||||
# Place build output in 'build' folder, even if name includes a longer path.
|
||||
outName = os.lastPathPart(name)
|
||||
cmd = "nim " & lang & " --out:build/" & outName & " " & extra_params & " " & srcDir & name & ".nim"
|
||||
cmd =
|
||||
"nim " & lang & " --out:build/" & outName & " " & extra_params & " " & srcDir &
|
||||
name & ".nim"
|
||||
|
||||
exec(cmd)
|
||||
|
||||
@ -29,7 +30,8 @@ proc test(name: string, srcDir = "tests/", params = "", lang = "c") =
|
||||
exec "build/" & name
|
||||
|
||||
task codex, "build codex binary":
|
||||
buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE"
|
||||
buildBinary "codex",
|
||||
params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE"
|
||||
|
||||
task toolsCirdl, "build tools/cirdl binary":
|
||||
buildBinary "tools/cirdl/cirdl"
|
||||
@ -41,7 +43,9 @@ task testContracts, "Build & run Codex Contract tests":
|
||||
test "testContracts"
|
||||
|
||||
task testIntegration, "Run integration tests":
|
||||
buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true"
|
||||
buildBinary "codex",
|
||||
params =
|
||||
"-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true"
|
||||
test "testIntegration"
|
||||
# use params to enable logging from the integration test executable
|
||||
# test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " &
|
||||
@ -90,15 +94,25 @@ task coverage, "generates code coverage report":
|
||||
|
||||
var nimSrcs = " "
|
||||
for f in walkDirRec("codex", {pcFile}):
|
||||
if f.endswith(".nim"): nimSrcs.add " " & f.absolutePath.quoteShell()
|
||||
if f.endswith(".nim"):
|
||||
nimSrcs.add " " & f.absolutePath.quoteShell()
|
||||
|
||||
echo "======== Running Tests ======== "
|
||||
test "coverage", srcDir = "tests/", params = " --nimcache:nimcache/coverage -d:release -d:codex_enable_proof_failures=true"
|
||||
test "coverage",
|
||||
srcDir = "tests/",
|
||||
params =
|
||||
" --nimcache:nimcache/coverage -d:release -d:codex_enable_proof_failures=true"
|
||||
exec("rm nimcache/coverage/*.c")
|
||||
rmDir("coverage"); mkDir("coverage")
|
||||
rmDir("coverage")
|
||||
mkDir("coverage")
|
||||
echo " ======== Running LCOV ======== "
|
||||
exec("lcov --capture --directory nimcache/coverage --output-file coverage/coverage.info")
|
||||
exec("lcov --extract coverage/coverage.info --output-file coverage/coverage.f.info " & nimSrcs)
|
||||
exec(
|
||||
"lcov --capture --directory nimcache/coverage --output-file coverage/coverage.info"
|
||||
)
|
||||
exec(
|
||||
"lcov --extract coverage/coverage.info --output-file coverage/coverage.f.info " &
|
||||
nimSrcs
|
||||
)
|
||||
echo " ======== Generating HTML coverage report ======== "
|
||||
exec("genhtml coverage/coverage.f.info --output-directory coverage/report ")
|
||||
echo " ======== Coverage report Done ======== "
|
||||
|
||||
@ -267,6 +267,65 @@ proc retrieve*(
|
||||
|
||||
await self.streamEntireDataset(manifest, cid)
|
||||
|
||||
proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
if err =? (await self.networkStore.delBlock(cid)).errorOption:
|
||||
error "Error deleting block", cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
trace "Deleted block", cid
|
||||
return success()
|
||||
|
||||
proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
# Deletion is a strictly local operation
|
||||
var store = self.networkStore.localStore
|
||||
|
||||
if not (await cid in store):
|
||||
# As per the contract for delete*, an absent dataset is not an error.
|
||||
return success()
|
||||
|
||||
without manifestBlock =? await store.getBlock(cid), err:
|
||||
return failure(err)
|
||||
|
||||
without manifest =? Manifest.decode(manifestBlock), err:
|
||||
return failure(err)
|
||||
|
||||
let runtimeQuota = initDuration(milliseconds = 100)
|
||||
var lastIdle = getTime()
|
||||
for i in 0 ..< manifest.blocksCount:
|
||||
if (getTime() - lastIdle) >= runtimeQuota:
|
||||
await idleAsync()
|
||||
lastIdle = getTime()
|
||||
|
||||
if err =? (await store.delBlock(manifest.treeCid, i)).errorOption:
|
||||
# The contract for delBlock is fuzzy, but we assume that if the block is
|
||||
# simply missing we won't get an error. This is a best effort operation and
|
||||
# can simply be retried.
|
||||
error "Failed to delete block within dataset", index = i, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if err =? (await store.delBlock(cid)).errorOption:
|
||||
error "Error deleting manifest block", err = err.msg
|
||||
|
||||
success()
|
||||
|
||||
proc delete*(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
): Future[?!void] {.async: (raises: [CatchableError]).} =
|
||||
## Deletes a whole dataset, if Cid is a Manifest Cid, or a single block, if Cid a block Cid,
|
||||
## from the underlying block store. This is a strictly local operation.
|
||||
##
|
||||
## Missing blocks in dataset deletes are ignored.
|
||||
##
|
||||
|
||||
without isManifest =? cid.isManifest, err:
|
||||
trace "Bad content type for CID:", cid = cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if not isManifest:
|
||||
return await self.deleteSingleBlock(cid)
|
||||
|
||||
await self.deleteEntireDataset(cid)
|
||||
|
||||
proc store*(
|
||||
self: CodexNodeRef,
|
||||
stream: LPStream,
|
||||
|
||||
@ -238,6 +238,15 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
let json = await formatManifestBlocks(node)
|
||||
return RestApiResponse.response($json, contentType = "application/json")
|
||||
|
||||
router.api(MethodOptions, "/api/codex/v1/data/{cid}") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
if corsOrigin =? allowedOrigin:
|
||||
resp.setCorsHeaders("GET,DELETE", corsOrigin)
|
||||
|
||||
resp.status = Http204
|
||||
await resp.sendBody("")
|
||||
|
||||
router.api(MethodGet, "/api/codex/v1/data/{cid}") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
@ -254,6 +263,27 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
|
||||
await node.retrieveCid(cid.get(), local = true, resp = resp)
|
||||
|
||||
router.api(MethodDelete, "/api/codex/v1/data/{cid}") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
## Deletes either a single block or an entire dataset
|
||||
## from the local node. Does nothing and returns 200
|
||||
## if the dataset is not locally available.
|
||||
##
|
||||
var headers = buildCorsHeaders("DELETE", allowedOrigin)
|
||||
|
||||
if cid.isErr:
|
||||
return RestApiResponse.error(Http400, $cid.error(), headers = headers)
|
||||
|
||||
if err =? (await node.delete(cid.get())).errorOption:
|
||||
return RestApiResponse.error(Http500, err.msg, headers = headers)
|
||||
|
||||
if corsOrigin =? allowedOrigin:
|
||||
resp.setCorsHeaders("DELETE", corsOrigin)
|
||||
|
||||
resp.status = Http204
|
||||
await resp.sendBody("")
|
||||
|
||||
router.api(MethodPost, "/api/codex/v1/data/{cid}/network") do(
|
||||
cid: Cid, resp: HttpResponseRef
|
||||
) -> RestApiResponse:
|
||||
|
||||
@ -8,6 +8,7 @@ import pkg/codex/stores
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/codex/sales
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/manifest
|
||||
import ../examples
|
||||
|
||||
export examples
|
||||
@ -51,6 +52,15 @@ proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx =
|
||||
proc example*(_: type Cid): Cid =
|
||||
bt.Block.example.cid
|
||||
|
||||
proc example*(_: type Manifest): Manifest =
|
||||
Manifest.new(
|
||||
treeCid = Cid.example,
|
||||
blockSize = 256.NBytes,
|
||||
datasetSize = 4096.NBytes,
|
||||
filename = "example.txt".some,
|
||||
mimetype = "text/plain".some,
|
||||
)
|
||||
|
||||
proc example*(_: type MultiHash, mcodec = Sha256HashCodec): MultiHash =
|
||||
let bytes = newSeqWith(256, rand(uint8))
|
||||
MultiHash.digest($mcodec, bytes).tryGet()
|
||||
|
||||
@ -85,30 +85,31 @@ proc makeWantList*(
|
||||
)
|
||||
|
||||
proc storeDataGetManifest*(
|
||||
store: BlockStore, chunker: Chunker
|
||||
store: BlockStore, blocks: seq[Block]
|
||||
): Future[Manifest] {.async.} =
|
||||
var cids = newSeq[Cid]()
|
||||
|
||||
while (let chunk = await chunker.getBytes(); chunk.len > 0):
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
cids.add(blk.cid)
|
||||
for blk in blocks:
|
||||
(await store.putBlock(blk)).tryGet()
|
||||
|
||||
let
|
||||
tree = CodexTree.init(cids).tryGet()
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(chunker.chunkSize),
|
||||
datasetSize = NBytes(chunker.offset),
|
||||
)
|
||||
|
||||
for i in 0 ..< tree.leavesCount:
|
||||
let proof = tree.getProof(i).tryGet()
|
||||
(await store.putCidAndProof(treeCid, i, cids[i], proof)).tryGet()
|
||||
(await store.putCidAndProof(treeCid, i, blocks[i].cid, proof)).tryGet()
|
||||
|
||||
return manifest
|
||||
|
||||
proc storeDataGetManifest*(
|
||||
store: BlockStore, chunker: Chunker
|
||||
): Future[Manifest] {.async.} =
|
||||
var blocks = newSeq[Block]()
|
||||
|
||||
while (let chunk = await chunker.getBytes(); chunk.len > 0):
|
||||
blocks.add(Block.new(chunk).tryGet())
|
||||
|
||||
return await storeDataGetManifest(store, blocks)
|
||||
|
||||
proc makeRandomBlocks*(
|
||||
datasetSize: int, blockSize: NBytes
|
||||
): Future[seq[Block]] {.async.} =
|
||||
|
||||
@ -37,6 +37,7 @@ import ../examples
|
||||
import ../helpers
|
||||
import ../helpers/mockmarket
|
||||
import ../helpers/mockclock
|
||||
import ../slots/helpers
|
||||
|
||||
import ./helpers
|
||||
|
||||
@ -166,3 +167,28 @@ asyncchecksuite "Test Node - Basic":
|
||||
(await verifiableBlock.cid in localStore) == true
|
||||
request.content.cid == $verifiableBlock.cid
|
||||
request.content.merkleRoot == builder.verifyRoot.get.toBytes
|
||||
|
||||
test "Should delete a single block":
|
||||
let randomBlock = bt.Block.new("Random block".toBytes).tryGet()
|
||||
(await localStore.putBlock(randomBlock)).tryGet()
|
||||
check (await randomBlock.cid in localStore) == true
|
||||
|
||||
(await node.delete(randomBlock.cid)).tryGet()
|
||||
check (await randomBlock.cid in localStore) == false
|
||||
|
||||
test "Should delete an entire dataset":
|
||||
let
|
||||
blocks = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
|
||||
manifest = await storeDataGetManifest(localStore, blocks)
|
||||
manifestBlock = (await store.storeManifest(manifest)).tryGet()
|
||||
manifestCid = manifestBlock.cid
|
||||
|
||||
check await manifestCid in localStore
|
||||
for blk in blocks:
|
||||
check await blk.cid in localStore
|
||||
|
||||
(await node.delete(manifestCid)).tryGet()
|
||||
|
||||
check not await manifestCid in localStore
|
||||
for blk in blocks:
|
||||
check not (await blk.cid in localStore)
|
||||
|
||||
@ -15,9 +15,7 @@ import pkg/codex/rng
|
||||
|
||||
import ../helpers
|
||||
|
||||
proc storeManifest*(
|
||||
store: BlockStore, manifest: Manifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
proc makeManifestBlock*(manifest: Manifest): ?!bt.Block =
|
||||
without encodedVerifiable =? manifest.encode(), err:
|
||||
trace "Unable to encode manifest"
|
||||
return failure(err)
|
||||
@ -26,6 +24,15 @@ proc storeManifest*(
|
||||
trace "Unable to create block from manifest"
|
||||
return failure(error)
|
||||
|
||||
success blk
|
||||
|
||||
proc storeManifest*(
|
||||
store: BlockStore, manifest: Manifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
without blk =? makeManifestBlock(manifest), err:
|
||||
trace "Unable to create manifest block", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if err =? (await store.putBlock(blk)).errorOption:
|
||||
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
|
||||
return failure(err)
|
||||
|
||||
@ -86,6 +86,16 @@ proc downloadBytes*(
|
||||
|
||||
success bytes
|
||||
|
||||
proc delete*(client: CodexClient, cid: Cid): ?!void =
|
||||
let
|
||||
url = client.baseurl & "/data/" & $cid
|
||||
response = client.http.delete(url)
|
||||
|
||||
if response.status != "204 No Content":
|
||||
return failure(response.status)
|
||||
|
||||
success()
|
||||
|
||||
proc list*(client: CodexClient): ?!RestContentList =
|
||||
let url = client.baseurl & "/data"
|
||||
let response = client.http.get(url)
|
||||
@ -284,3 +294,6 @@ proc downloadRaw*(client: CodexClient, cid: string, local = false): Response =
|
||||
client.baseurl & "/data/" & cid & (if local: "" else: "/network/stream"),
|
||||
httpMethod = HttpGet,
|
||||
)
|
||||
|
||||
proc deleteRaw*(client: CodexClient, cid: string): Response =
|
||||
return client.http.request(client.baseurl & "/data/" & cid, httpMethod = HttpDelete)
|
||||
|
||||
@ -1,10 +1,13 @@
|
||||
import std/httpclient
|
||||
import std/sequtils
|
||||
import std/strformat
|
||||
from pkg/libp2p import `==`
|
||||
from pkg/libp2p import `==`, `$`, Cid
|
||||
import pkg/codex/units
|
||||
import pkg/codex/manifest
|
||||
import ./twonodes
|
||||
import ../examples
|
||||
import ../codex/examples
|
||||
import ../codex/slots/helpers
|
||||
import json
|
||||
|
||||
twonodessuite "REST API":
|
||||
@ -263,3 +266,24 @@ twonodessuite "REST API":
|
||||
check localResponse.headers.hasKey("Content-Disposition") == true
|
||||
check localResponse.headers["Content-Disposition"] ==
|
||||
"attachment; filename=\"example.txt\""
|
||||
|
||||
test "should delete a dataset when requested", twoNodesConfig:
|
||||
let cid = client1.upload("some file contents").get
|
||||
|
||||
var response = client1.downloadRaw($cid, local = true)
|
||||
check response.body == "some file contents"
|
||||
|
||||
client1.delete(cid).get
|
||||
|
||||
response = client1.downloadRaw($cid, local = true)
|
||||
check response.status == "404 Not Found"
|
||||
|
||||
test "should return 200 when attempting delete of non-existing block", twoNodesConfig:
|
||||
let response = client1.deleteRaw($(Cid.example()))
|
||||
check response.status == "204 No Content"
|
||||
|
||||
test "should return 200 when attempting delete of non-existing dataset",
|
||||
twoNodesConfig:
|
||||
let cid = Manifest.example().makeManifestBlock().get.cid
|
||||
let response = client1.deleteRaw($cid)
|
||||
check response.status == "204 No Content"
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
import std/os
|
||||
import std/macros
|
||||
import pkg/questionable
|
||||
import ./multinodes
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user