mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-02-20 21:43:14 +00:00
chore!: Finish renaming Codex to Logos Storage (#1399)
This commit is contained in:
parent
4068bcb2ed
commit
ab413bdfcf
2
.github/workflows/ci-reusable.yml
vendored
2
.github/workflows/ci-reusable.yml
vendored
@ -52,7 +52,7 @@ jobs:
|
||||
- name: Integration tests
|
||||
if: matrix.tests == 'integration' || matrix.tests == 'all'
|
||||
env:
|
||||
CODEX_INTEGRATION_TEST_INCLUDES: ${{ matrix.includes }}
|
||||
STORAGE_INTEGRATION_TEST_INCLUDES: ${{ matrix.includes }}
|
||||
run: make -j${ncpu} DEBUG=${{ runner.debug }} testIntegration
|
||||
|
||||
- name: Upload integration tests log files
|
||||
|
||||
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -49,7 +49,7 @@ jobs:
|
||||
uses: arnetheduck/nph-action@v1
|
||||
with:
|
||||
version: latest
|
||||
options: "codex/ tests/"
|
||||
options: "storage/ tests/"
|
||||
fail: true
|
||||
suggest: true
|
||||
|
||||
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@ -38,7 +38,7 @@ nimble.paths
|
||||
.env
|
||||
|
||||
.update.timestamp
|
||||
codex.nims
|
||||
storage.nims
|
||||
nimbus-build-system.paths
|
||||
docker/hostdatadir
|
||||
docker/prometheus-data
|
||||
@ -50,3 +50,5 @@ data/
|
||||
|
||||
tests/cbindings/data-dir
|
||||
tests/cbindings/downloaded_hello.txt
|
||||
|
||||
nimbledeps
|
||||
|
||||
5
.nph.toml
Normal file
5
.nph.toml
Normal file
@ -0,0 +1,5 @@
|
||||
# Add to default exclusions (recommended - doesn't lose the defaults)
|
||||
extend-exclude = [
|
||||
# "tests/fixtures",
|
||||
"vendor",
|
||||
]
|
||||
18
Makefile
18
Makefile
@ -181,11 +181,11 @@ endif
|
||||
|
||||
coverage:
|
||||
$(MAKE) NIMFLAGS="$(NIMFLAGS) --lineDir:on --passC:-fprofile-arcs --passC:-ftest-coverage --passL:-fprofile-arcs --passL:-ftest-coverage" test
|
||||
cd nimcache/release/testCodex && rm -f *.c
|
||||
cd nimcache/release/testStorage && rm -f *.c
|
||||
mkdir -p coverage
|
||||
lcov --capture --keep-going --directory nimcache/release/testCodex --output-file coverage/coverage.info
|
||||
shopt -s globstar && ls $$(pwd)/codex/{*,**/*}.nim
|
||||
shopt -s globstar && lcov --extract coverage/coverage.info --keep-going $$(pwd)/codex/{*,**/*}.nim --output-file coverage/coverage.f.info
|
||||
lcov --capture --keep-going --directory nimcache/release/testStorage --output-file coverage/coverage.info
|
||||
shopt -s globstar && ls $$(pwd)/storage/{*,**/*}.nim
|
||||
shopt -s globstar && lcov --extract coverage/coverage.info --keep-going $$(pwd)/storage/{*,**/*}.nim --output-file coverage/coverage.f.info
|
||||
echo -e $(BUILD_MSG) "coverage/report/index.html"
|
||||
genhtml coverage/coverage.f.info --keep-going --output-directory coverage/report
|
||||
|
||||
@ -237,7 +237,7 @@ nph/%: build-nph
|
||||
|
||||
format:
|
||||
$(NPH) *.nim
|
||||
$(NPH) codex/
|
||||
$(NPH) storage/
|
||||
$(NPH) tests/
|
||||
$(NPH) library/
|
||||
|
||||
@ -267,15 +267,15 @@ libstorage:
|
||||
|
||||
ifeq ($(STATIC), 1)
|
||||
echo -e $(BUILD_MSG) "build/$@.a" && \
|
||||
$(ENV_SCRIPT) nim libstorageStatic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) codex.nims
|
||||
$(ENV_SCRIPT) nim libstorageStatic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) storage.nims
|
||||
else ifeq ($(detected_OS),Windows)
|
||||
echo -e $(BUILD_MSG) "build/$@.dll" && \
|
||||
$(ENV_SCRIPT) nim libstorageDynamic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) codex.nims
|
||||
$(ENV_SCRIPT) nim libstorageDynamic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) storage.nims
|
||||
else ifeq ($(detected_OS),macOS)
|
||||
echo -e $(BUILD_MSG) "build/$@.dylib" && \
|
||||
$(ENV_SCRIPT) nim libstorageDynamic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) codex.nims
|
||||
$(ENV_SCRIPT) nim libstorageDynamic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) storage.nims
|
||||
else
|
||||
echo -e $(BUILD_MSG) "build/$@.so" && \
|
||||
$(ENV_SCRIPT) nim libstorageDynamic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) codex.nims
|
||||
$(ENV_SCRIPT) nim libstorageDynamic $(NIM_PARAMS) $(LIBSTORAGE_PARAMS) storage.nims
|
||||
endif
|
||||
endif # "variables.mk" was not included
|
||||
|
||||
25
README.md
25
README.md
@ -1,6 +1,6 @@
|
||||
# Logos Storage Decentralized Engine
|
||||
# Logos Storage Filesharing Client
|
||||
|
||||
> The Logos Storage project aims to create a decentralized engine that allows persisting data in p2p networks.
|
||||
> The Logos Storage project aims to create a filesharing client that allows sharing data privately in p2p networks.
|
||||
|
||||
> WARNING: This project is under active development and is considered pre-alpha.
|
||||
|
||||
@ -11,17 +11,17 @@
|
||||
[](https://github.com/logos-storage/logos-storage-nim/actions/workflows/docker.yml?query=branch%3Amaster)
|
||||
[](https://codecov.io/gh/logos-storage/logos-storage-nim)
|
||||
[](https://discord.gg/CaJTh24ddQ)
|
||||

|
||||

|
||||
|
||||
|
||||
## Build and Run
|
||||
|
||||
For detailed instructions on preparing to build logos-storagenim see [*Build Logos Storage*](https://docs.codex.storage/learn/build).
|
||||
|
||||
To build the project, clone it and run:
|
||||
|
||||
```bash
|
||||
make update && make
|
||||
# Tip: use -j{ncpu} to for parallel execution, eg:
|
||||
# make -j12 update && make -j12
|
||||
```
|
||||
|
||||
The executable will be placed under the `build` directory under the project root.
|
||||
@ -41,12 +41,7 @@ It is possible to configure a Logos Storage node in several ways:
|
||||
|
||||
The order of priority is the same as above: CLI options --> Environment variables --> Configuration file.
|
||||
|
||||
Please check [documentation](https://docs.codex.storage/learn/run#configuration) for more information.
|
||||
|
||||
## Guides
|
||||
|
||||
To get acquainted with Logos Storage, consider:
|
||||
* running the simple [Logos Storage Two-Client Test](https://docs.codex.storage/learn/local-two-client-test) for a start, and;
|
||||
Please check `build/storage --help` for more information.
|
||||
|
||||
## API
|
||||
|
||||
@ -54,8 +49,9 @@ The client exposes a REST API that can be used to interact with the clients. Ove
|
||||
|
||||
## Bindings
|
||||
|
||||
Logos Storage provides a C API that can be wrapped by other languages. The bindings is located in the `library` folder.
|
||||
Currently, only a Go binding is included.
|
||||
Logos Storage provides a C API that can be wrapped by other languages. The C API bindings are located in the `library` folder.
|
||||
|
||||
Currently, only Go bindings are provided in this repo. However, Rust bindings for Logos Storage can be found at https://github.com/nipsysdev/storage-rust-bindings.
|
||||
|
||||
### Build the C library
|
||||
|
||||
@ -71,7 +67,8 @@ See https://github.com/logos-storage/logos-storage-go-bindings-example.
|
||||
|
||||
### Static vs Dynamic build
|
||||
|
||||
By default, Logos Storage builds a dynamic library (`libstorage.so`), which you can load at runtime.
|
||||
By default, Logos Storage builds a dynamic library (`libstorage.so`/`libstorage.dylib`/`libstroage.dll`), which you can load at runtime.
|
||||
|
||||
If you prefer a static library (`libstorage.a`), set the `STATIC` flag:
|
||||
|
||||
```bash
|
||||
|
||||
@ -62,15 +62,15 @@ proc test(name: string, outName = name, srcDir = "tests/", params = "", lang = "
|
||||
exec "build/" & outName
|
||||
|
||||
task storage, "build logos storage binary":
|
||||
buildBinary "codex",
|
||||
buildBinary "storage",
|
||||
outname = "storage",
|
||||
params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE"
|
||||
|
||||
task testStorage, "Build & run Logos Storage tests":
|
||||
test "testCodex", outName = "testStorage"
|
||||
test "testStorage", outName = "testStorage"
|
||||
|
||||
task testIntegration, "Run integration tests":
|
||||
buildBinary "codex",
|
||||
buildBinary "storage",
|
||||
outName = "storage",
|
||||
params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE"
|
||||
test "testIntegration"
|
||||
@ -110,7 +110,7 @@ task coverage, "generates code coverage report":
|
||||
echo " *****************************************************************"
|
||||
|
||||
var nimSrcs = " "
|
||||
for f in walkDirRec("codex", {pcFile}):
|
||||
for f in walkDirRec("storage", {pcFile}):
|
||||
if f.endswith(".nim"):
|
||||
nimSrcs.add " " & f.absolutePath.quoteShell()
|
||||
|
||||
|
||||
@ -1,2 +0,0 @@
|
||||
const ContentIdsExts =
|
||||
[multiCodec("codex-root"), multiCodec("codex-manifest"), multiCodec("codex-block")]
|
||||
@ -1,2 +0,0 @@
|
||||
const CodecExts =
|
||||
[("codex-manifest", 0xCD01), ("codex-block", 0xCD02), ("codex-root", 0xCD03)]
|
||||
@ -1,25 +0,0 @@
|
||||
## Logos Storage
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
const
|
||||
# Namespaces
|
||||
CodexMetaNamespace* = "meta" # meta info stored here
|
||||
CodexRepoNamespace* = "repo" # repository namespace, blocks and manifests are subkeys
|
||||
CodexBlockTotalNamespace* = CodexMetaNamespace & "/total"
|
||||
# number of blocks in the repo
|
||||
CodexBlocksNamespace* = CodexRepoNamespace & "/blocks" # blocks namespace
|
||||
CodexManifestNamespace* = CodexRepoNamespace & "/manifests" # manifest namespace
|
||||
CodexBlocksTtlNamespace* = # Cid TTL
|
||||
CodexMetaNamespace & "/ttl"
|
||||
CodexBlockProofNamespace* = # Cid and Proof
|
||||
CodexMetaNamespace & "/proof"
|
||||
CodexDhtNamespace* = "dht" # Dht namespace
|
||||
CodexDhtProvidersNamespace* = # Dht providers namespace
|
||||
CodexDhtNamespace & "/providers"
|
||||
CodexQuotaNamespace* = CodexMetaNamespace & "/quota" # quota's namespace
|
||||
10
config.nims
10
config.nims
@ -93,11 +93,11 @@ else:
|
||||
--warningAsError:
|
||||
"ProveField:on"
|
||||
--define:
|
||||
"libp2p_multicodec_exts:../../../codex/multicodec_exts.nim"
|
||||
"libp2p_multicodec_exts:../../../storage/multicodec_exts.nim"
|
||||
--define:
|
||||
"libp2p_multihash_exts:../../../codex/multihash_exts.nim"
|
||||
"libp2p_multihash_exts:../../../storage/multihash_exts.nim"
|
||||
--define:
|
||||
"libp2p_contentids_exts:../../../codex/contentids_exts.nim"
|
||||
"libp2p_contentids_exts:../../../storage/contentids_exts.nim"
|
||||
|
||||
when (NimMajor, NimMinor) >= (1, 4):
|
||||
--warning:
|
||||
@ -142,8 +142,8 @@ switch("warning", "LockLevel:off")
|
||||
|
||||
switch("define", "libp2p_pki_schemes=secp256k1")
|
||||
#TODO this infects everything in this folder, ideally it would only
|
||||
# apply to codex.nim, but since codex.nims is used for other purpose
|
||||
# we can't use it. And codex.cfg doesn't work
|
||||
# apply to storage.nim, but since storage.nims is used for other purpose
|
||||
# we can't use it. And storage.cfg doesn't work
|
||||
switch("define", "chronicles_sinks=textlines[dynamic],json[dynamic],textlines[dynamic]")
|
||||
|
||||
# Workaround for assembler incompatibility between constantine and secp256k1
|
||||
|
||||
@ -37,7 +37,7 @@
|
||||
|
||||
## Environment variables
|
||||
|
||||
We can configure Codex using [Environment variables](../README#environment-variables) and [docker-compose.yaml](docker-compose.yaml) file can be useful as an example.
|
||||
We can configure Logos Storage using [Environment variables](../README#environment-variables) and [docker-compose.yaml](docker-compose.yaml) file can be useful as an example.
|
||||
|
||||
We also added a temporary environment variable `NAT_IP_AUTO` to the entrypoint which is set as `false` for releases and ` true` for regular builds. That approach is useful for Dist-Tests.
|
||||
```shell
|
||||
@ -51,7 +51,7 @@
|
||||
2. The docker image can then be minified using [slim](https://github.com/slimtoolkit/slim). Install slim on your path and then run:
|
||||
```shell
|
||||
slim # brings up interactive prompt
|
||||
>>> build --target status-im/codexsetup --http-probe-off true
|
||||
>>> build --target status-im/logosstoragesetup --http-probe-off true
|
||||
```
|
||||
3. This should output an image with name `status-im/codexsetup.slim`
|
||||
3. This should output an image with name `status-im/logosstoragesetup.slim`
|
||||
4. We can then bring up the image using `docker-compose up -d`.
|
||||
|
||||
@ -16,7 +16,7 @@ sequenceDiagram
|
||||
participant C as C API (libstorage.h)
|
||||
participant Ctx as StorageContext
|
||||
participant Thr as Worker Thread
|
||||
participant Eng as CodexServer
|
||||
participant Eng as StorageServer
|
||||
|
||||
App->>Go: Start
|
||||
Go->>C: storage_start_node
|
||||
|
||||
@ -39,7 +39,7 @@ import ./storage_thread_requests/requests/node_download_request
|
||||
import ./storage_thread_requests/requests/node_storage_request
|
||||
import ./ffi_types
|
||||
|
||||
from ../codex/conf import codexVersion
|
||||
from ../storage/conf import storageVersion
|
||||
|
||||
logScope:
|
||||
topics = "libstorage"
|
||||
@ -125,12 +125,12 @@ proc storage_new(
|
||||
proc storage_version(ctx: ptr StorageContext): ptr cchar {.dynlib, exportc.} =
|
||||
initializeLibrary()
|
||||
|
||||
return asNewCString(conf.codexVersion)
|
||||
return asNewCString(conf.storageVersion)
|
||||
|
||||
proc storage_revision(ctx: ptr StorageContext): ptr cchar {.dynlib, exportc.} =
|
||||
initializeLibrary()
|
||||
|
||||
return asNewCString(conf.codexRevision)
|
||||
return asNewCString(conf.storageVersion)
|
||||
|
||||
proc storage_repo(
|
||||
ctx: ptr StorageContext, callback: StorageCallback, userData: pointer
|
||||
|
||||
@ -16,7 +16,7 @@ import taskpools/channels_spsc_single
|
||||
import ./ffi_types
|
||||
import ./storage_thread_requests/[storage_thread_request]
|
||||
|
||||
from ../codex/codex import CodexServer
|
||||
from ../storage/storage import StorageServer
|
||||
|
||||
logScope:
|
||||
topics = "libstorage"
|
||||
@ -124,7 +124,7 @@ proc sendRequestToStorageThread*(
|
||||
ok()
|
||||
|
||||
proc runStorage(ctx: ptr StorageContext) {.async: (raises: []).} =
|
||||
var storage: CodexServer
|
||||
var storage: StorageServer
|
||||
|
||||
while true:
|
||||
try:
|
||||
|
||||
@ -10,11 +10,11 @@ import chronos
|
||||
import chronicles
|
||||
import codexdht/discv5/spr
|
||||
import ../../alloc
|
||||
import ../../../codex/conf
|
||||
import ../../../codex/rest/json
|
||||
import ../../../codex/node
|
||||
import ../../../storage/conf
|
||||
import ../../../storage/rest/json
|
||||
import ../../../storage/node
|
||||
|
||||
from ../../../codex/codex import CodexServer, node
|
||||
from ../../../storage/storage import StorageServer, node
|
||||
|
||||
logScope:
|
||||
topics = "libstorage libstoragedebug"
|
||||
@ -47,7 +47,7 @@ proc destroyShared(self: ptr NodeDebugRequest) =
|
||||
deallocShared(self)
|
||||
|
||||
proc getDebug(
|
||||
storage: ptr CodexServer
|
||||
storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
let node = storage[].node
|
||||
let table = RestRoutingTable.init(node.discovery.protocol.routingTable)
|
||||
@ -64,7 +64,7 @@ proc getDebug(
|
||||
return ok($json)
|
||||
|
||||
proc getPeer(
|
||||
storage: ptr CodexServer, peerId: cstring
|
||||
storage: ptr StorageServer, peerId: cstring
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
when storage_enable_api_debug_peers:
|
||||
let node = storage[].node
|
||||
@ -88,7 +88,7 @@ proc getPeer(
|
||||
return err("Failed to get peer: peer debug API is disabled")
|
||||
|
||||
proc updateLogLevel(
|
||||
storage: ptr CodexServer, logLevel: cstring
|
||||
storage: ptr StorageServer, logLevel: cstring
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
{.gcsafe.}:
|
||||
@ -99,7 +99,7 @@ proc updateLogLevel(
|
||||
return ok("")
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeDebugRequest, storage: ptr CodexServer
|
||||
self: ptr NodeDebugRequest, storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
@ -21,12 +21,12 @@ import chronicles
|
||||
import libp2p/stream/[lpstream]
|
||||
import serde/json as serde
|
||||
import ../../alloc
|
||||
import ../../../codex/units
|
||||
import ../../../codex/codextypes
|
||||
import ../../../storage/units
|
||||
import ../../../storage/storagetypes
|
||||
|
||||
from ../../../codex/codex import CodexServer, node
|
||||
from ../../../codex/node import retrieve, fetchManifest
|
||||
from ../../../codex/rest/json import `%`, RestContent
|
||||
from ../../../storage/storage import StorageServer, node
|
||||
from ../../../storage/node import retrieve, fetchManifest
|
||||
from ../../../storage/rest/json import `%`, RestContent
|
||||
from libp2p import Cid, init, `$`
|
||||
|
||||
logScope:
|
||||
@ -80,7 +80,7 @@ proc destroyShared(self: ptr NodeDownloadRequest) =
|
||||
deallocShared(self)
|
||||
|
||||
proc init(
|
||||
storage: ptr CodexServer, cCid: cstring = "", chunkSize: csize_t = 0, local: bool
|
||||
storage: ptr StorageServer, cCid: cstring = "", chunkSize: csize_t = 0, local: bool
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
## Init a new session to download the file identified by cid.
|
||||
##
|
||||
@ -114,7 +114,7 @@ proc init(
|
||||
return ok("")
|
||||
|
||||
proc chunk(
|
||||
storage: ptr CodexServer, cCid: cstring = "", onChunk: OnChunkHandler
|
||||
storage: ptr StorageServer, cCid: cstring = "", onChunk: OnChunkHandler
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
## Download the next chunk of the file identified by cid.
|
||||
## The chunk is passed to the onChunk handler.
|
||||
@ -164,7 +164,7 @@ proc chunk(
|
||||
return ok("")
|
||||
|
||||
proc streamData(
|
||||
storage: ptr CodexServer,
|
||||
storage: ptr StorageServer,
|
||||
stream: LPStream,
|
||||
onChunk: OnChunkHandler,
|
||||
chunkSize: csize_t,
|
||||
@ -207,7 +207,7 @@ proc streamData(
|
||||
return ok("")
|
||||
|
||||
proc stream(
|
||||
storage: ptr CodexServer,
|
||||
storage: ptr StorageServer,
|
||||
cCid: cstring,
|
||||
chunkSize: csize_t,
|
||||
local: bool,
|
||||
@ -251,7 +251,7 @@ proc stream(
|
||||
return ok("")
|
||||
|
||||
proc cancel(
|
||||
storage: ptr CodexServer, cCid: cstring
|
||||
storage: ptr StorageServer, cCid: cstring
|
||||
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
|
||||
## Cancel the download session identified by cid.
|
||||
## This operation is not supported when using the stream mode,
|
||||
@ -279,7 +279,7 @@ proc cancel(
|
||||
return ok("")
|
||||
|
||||
proc manifest(
|
||||
storage: ptr CodexServer, cCid: cstring
|
||||
storage: ptr StorageServer, cCid: cstring
|
||||
): Future[Result[string, string]] {.raises: [], async: (raises: []).} =
|
||||
let cid = Cid.init($cCid)
|
||||
if cid.isErr:
|
||||
@ -296,7 +296,7 @@ proc manifest(
|
||||
return err("Failed to fetch manifest: download cancelled.")
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeDownloadRequest, storage: ptr CodexServer, onChunk: OnChunkHandler
|
||||
self: ptr NodeDownloadRequest, storage: ptr StorageServer, onChunk: OnChunkHandler
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
@ -5,11 +5,11 @@ import chronos
|
||||
import chronicles
|
||||
import confutils
|
||||
import codexdht/discv5/spr
|
||||
import ../../../codex/conf
|
||||
import ../../../codex/rest/json
|
||||
import ../../../codex/node
|
||||
import ../../../storage/conf
|
||||
import ../../../storage/rest/json
|
||||
import ../../../storage/node
|
||||
|
||||
from ../../../codex/codex import CodexServer, config, node
|
||||
from ../../../storage/storage import StorageServer, config, node
|
||||
|
||||
logScope:
|
||||
topics = "libstorage libstorageinfo"
|
||||
@ -31,12 +31,12 @@ proc destroyShared(self: ptr NodeInfoRequest) =
|
||||
deallocShared(self)
|
||||
|
||||
proc getRepo(
|
||||
storage: ptr CodexServer
|
||||
storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
return ok($(storage[].config.dataDir))
|
||||
|
||||
proc getSpr(
|
||||
storage: ptr CodexServer
|
||||
storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
let spr = storage[].node.discovery.dhtRecord
|
||||
if spr.isNone:
|
||||
@ -45,12 +45,12 @@ proc getSpr(
|
||||
return ok(spr.get.toURI)
|
||||
|
||||
proc getPeerId(
|
||||
storage: ptr CodexServer
|
||||
storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
return ok($storage[].node.switch.peerInfo.peerId)
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeInfoRequest, storage: ptr CodexServer
|
||||
self: ptr NodeInfoRequest, storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
@ -17,12 +17,12 @@ import libp2p
|
||||
import json_serialization
|
||||
import json_serialization/std/[options, net]
|
||||
import ../../alloc
|
||||
import ../../../codex/conf
|
||||
import ../../../codex/utils
|
||||
import ../../../codex/utils/[keyutils, fileutils]
|
||||
import ../../../codex/units
|
||||
import ../../../storage/conf
|
||||
import ../../../storage/utils
|
||||
import ../../../storage/utils/[keyutils, fileutils]
|
||||
import ../../../storage/units
|
||||
|
||||
from ../../../codex/codex import CodexServer, new, start, stop, close
|
||||
from ../../../storage/storage import StorageServer, new, start, stop, close
|
||||
|
||||
logScope:
|
||||
topics = "libstorage libstoragelifecycle"
|
||||
@ -89,16 +89,16 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
|
||||
|
||||
proc createStorage(
|
||||
configJson: cstring
|
||||
): Future[Result[CodexServer, string]] {.async: (raises: []).} =
|
||||
var conf: CodexConf
|
||||
): Future[Result[StorageServer, string]] {.async: (raises: []).} =
|
||||
var conf: StorageConf
|
||||
|
||||
try:
|
||||
conf = CodexConf.load(
|
||||
version = codexFullVersion,
|
||||
conf = StorageConf.load(
|
||||
version = storageFullVersion,
|
||||
envVarsPrefix = "storage",
|
||||
cmdLine = @[],
|
||||
secondarySources = proc(
|
||||
config: CodexConf, sources: auto
|
||||
config: StorageConf, sources: auto
|
||||
) {.gcsafe, raises: [ConfigurationError].} =
|
||||
if configJson.len > 0:
|
||||
sources.addConfigFileContent(Json, $(configJson))
|
||||
@ -149,14 +149,14 @@ proc createStorage(
|
||||
|
||||
let server =
|
||||
try:
|
||||
CodexServer.new(conf, pk)
|
||||
StorageServer.new(conf, pk)
|
||||
except Exception as exc:
|
||||
return err("Failed to create Storage: " & exc.msg)
|
||||
|
||||
return ok(server)
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeLifecycleRequest, storage: ptr CodexServer
|
||||
self: ptr NodeLifecycleRequest, storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
@ -8,9 +8,9 @@ import chronos
|
||||
import chronicles
|
||||
import libp2p
|
||||
import ../../alloc
|
||||
import ../../../codex/node
|
||||
import ../../../storage/node
|
||||
|
||||
from ../../../codex/codex import CodexServer, node
|
||||
from ../../../storage/storage import StorageServer, node
|
||||
|
||||
logScope:
|
||||
topics = "libstorage libstoragep2p"
|
||||
@ -40,7 +40,7 @@ proc destroyShared(self: ptr NodeP2PRequest) =
|
||||
deallocShared(self)
|
||||
|
||||
proc connect(
|
||||
storage: ptr CodexServer, peerId: cstring, peerAddresses: seq[cstring] = @[]
|
||||
storage: ptr StorageServer, peerId: cstring, peerAddresses: seq[cstring] = @[]
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
let node = storage[].node
|
||||
let res = PeerId.init($peerId)
|
||||
@ -81,7 +81,7 @@ proc connect(
|
||||
return ok("")
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeP2PRequest, storage: ptr CodexServer
|
||||
self: ptr NodeP2PRequest, storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
@ -14,12 +14,12 @@ import chronicles
|
||||
import libp2p/stream/[lpstream]
|
||||
import serde/json as serde
|
||||
import ../../alloc
|
||||
import ../../../codex/units
|
||||
import ../../../codex/manifest
|
||||
import ../../../codex/stores/repostore
|
||||
import ../../../storage/units
|
||||
import ../../../storage/manifest
|
||||
import ../../../storage/stores/repostore
|
||||
|
||||
from ../../../codex/codex import CodexServer, node, repoStore
|
||||
from ../../../codex/node import
|
||||
from ../../../storage/storage import StorageServer, node, repoStore
|
||||
from ../../../storage/node import
|
||||
iterateManifests, fetchManifest, fetchDatasetAsyncTask, delete, hasLocalBlock
|
||||
from libp2p import Cid, init, `$`
|
||||
|
||||
@ -61,7 +61,7 @@ type ManifestWithCid = object
|
||||
manifest {.serialize.}: Manifest
|
||||
|
||||
proc list(
|
||||
storage: ptr CodexServer
|
||||
storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
var manifests = newSeq[ManifestWithCid]()
|
||||
proc onManifest(cid: Cid, manifest: Manifest) {.raises: [], gcsafe.} =
|
||||
@ -78,7 +78,7 @@ proc list(
|
||||
return ok(serde.toJson(manifests))
|
||||
|
||||
proc delete(
|
||||
storage: ptr CodexServer, cCid: cstring
|
||||
storage: ptr StorageServer, cCid: cstring
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
let cid = Cid.init($cCid)
|
||||
if cid.isErr:
|
||||
@ -97,7 +97,7 @@ proc delete(
|
||||
return ok("")
|
||||
|
||||
proc fetch(
|
||||
storage: ptr CodexServer, cCid: cstring
|
||||
storage: ptr StorageServer, cCid: cstring
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
let cid = Cid.init($cCid)
|
||||
if cid.isErr:
|
||||
@ -116,7 +116,7 @@ proc fetch(
|
||||
return err("Failed to fetch the data: download cancelled.")
|
||||
|
||||
proc space(
|
||||
storage: ptr CodexServer
|
||||
storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
let repoStore = storage[].repoStore
|
||||
let space = StorageSpace(
|
||||
@ -128,7 +128,7 @@ proc space(
|
||||
return ok(serde.toJson(space))
|
||||
|
||||
proc exists(
|
||||
storage: ptr CodexServer, cCid: cstring
|
||||
storage: ptr StorageServer, cCid: cstring
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
let cid = Cid.init($cCid)
|
||||
if cid.isErr:
|
||||
@ -142,7 +142,7 @@ proc exists(
|
||||
return err("Failed to check the data existence: operation cancelled.")
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeStorageRequest, storage: ptr CodexServer
|
||||
self: ptr NodeStorageRequest, storage: ptr StorageServer
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
destroyShared(self)
|
||||
|
||||
@ -24,11 +24,11 @@ import questionable/results
|
||||
import faststreams/inputs
|
||||
import libp2p/stream/[bufferstream, lpstream]
|
||||
import ../../alloc
|
||||
import ../../../codex/units
|
||||
import ../../../codex/codextypes
|
||||
import ../../../storage/units
|
||||
import ../../../storage/storagetypes
|
||||
|
||||
from ../../../codex/codex import CodexServer, node
|
||||
from ../../../codex/node import store
|
||||
from ../../../storage/storage import StorageServer, node
|
||||
from ../../../storage/node import store
|
||||
from libp2p import Cid, `$`
|
||||
|
||||
logScope:
|
||||
@ -86,7 +86,7 @@ proc destroyShared(self: ptr NodeUploadRequest) =
|
||||
deallocShared(self)
|
||||
|
||||
proc init(
|
||||
storage: ptr CodexServer, filepath: cstring = "", chunkSize: csize_t = 0
|
||||
storage: ptr StorageServer, filepath: cstring = "", chunkSize: csize_t = 0
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
## Init a new session upload and return its ID.
|
||||
## The session contains the future corresponding to the
|
||||
@ -156,7 +156,7 @@ proc init(
|
||||
return ok(sessionId)
|
||||
|
||||
proc chunk(
|
||||
storage: ptr CodexServer, sessionId: cstring, chunk: seq[byte]
|
||||
storage: ptr StorageServer, sessionId: cstring, chunk: seq[byte]
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
## Upload a chunk of data to the session identified by sessionId.
|
||||
## The chunk is pushed to the BufferStream of the session.
|
||||
@ -205,7 +205,7 @@ proc chunk(
|
||||
return ok("")
|
||||
|
||||
proc finalize(
|
||||
storage: ptr CodexServer, sessionId: cstring
|
||||
storage: ptr StorageServer, sessionId: cstring
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
## Finalize the upload session identified by sessionId.
|
||||
## This closes the BufferStream and waits for the `node.store` future
|
||||
@ -242,7 +242,7 @@ proc finalize(
|
||||
session.fut.cancelSoon()
|
||||
|
||||
proc cancel(
|
||||
storage: ptr CodexServer, sessionId: cstring
|
||||
storage: ptr StorageServer, sessionId: cstring
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
## Cancel the upload session identified by sessionId.
|
||||
## This cancels the `node.store` future and removes the session
|
||||
@ -293,7 +293,7 @@ proc streamFile(
|
||||
return err("Failed to stream the file: " & $e.msg)
|
||||
|
||||
proc file(
|
||||
storage: ptr CodexServer, sessionId: cstring, onProgress: OnProgressHandler
|
||||
storage: ptr StorageServer, sessionId: cstring, onProgress: OnProgressHandler
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
## Starts the file upload for the session identified by sessionId.
|
||||
## Will call finalize when done and return the CID of the uploaded file.
|
||||
@ -333,7 +333,7 @@ proc file(
|
||||
|
||||
proc process*(
|
||||
self: ptr NodeUploadRequest,
|
||||
storage: ptr CodexServer,
|
||||
storage: ptr StorageServer,
|
||||
onUploadProgress: OnProgressHandler = nil,
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
defer:
|
||||
|
||||
@ -14,7 +14,7 @@ import ./requests/node_upload_request
|
||||
import ./requests/node_download_request
|
||||
import ./requests/node_storage_request
|
||||
|
||||
from ../../codex/codex import CodexServer
|
||||
from ../../storage/storage import StorageServer
|
||||
|
||||
type RequestType* {.pure.} = enum
|
||||
LIFECYCLE
|
||||
@ -89,7 +89,7 @@ proc handleRes[T: string | void | seq[byte]](
|
||||
proc process*(
|
||||
T: type StorageThreadRequest,
|
||||
request: ptr StorageThreadRequest,
|
||||
storage: ptr CodexServer,
|
||||
storage: ptr StorageServer,
|
||||
) {.async: (raises: []).} =
|
||||
## Processes the request in the Logos Storage thread.
|
||||
## Dispatch to the appropriate request handler based on reqType.
|
||||
|
||||
@ -12,7 +12,7 @@ Use the `--metrics-address` and `--metrics-port` flags to to adjust the address
|
||||
|
||||
Metrics are useful to monitor the health of the process and should aid in identifying and debugging potential issues that would be hard to notice otherwise.
|
||||
|
||||
All Logos Storage metrics should be prefixed with the `codex_` prefix to be able to differentiate from metrics exposed by other subsystems. For example libp2p generally prefixed with the `libp2p_` prefix.
|
||||
All Logos Storage metrics should be prefixed with the `storage_` prefix to be able to differentiate from metrics exposed by other subsystems. For example libp2p generally prefixed with the `libp2p_` prefix.
|
||||
|
||||
Metrics can be added on an as needed basis, however, keep in mind the potential overhead they might introduce. In particular, be careful with labels as they will generate as many metrics as there are labels for a specific collector. If a metrics or a set of metrics are expensive, it is usually advisable to put them behind a compile time flag.
|
||||
|
||||
@ -20,7 +20,7 @@ Metrics can be added on an as needed basis, however, keep in mind the potential
|
||||
|
||||
The exposed metrics can be aggregate by the [Prometheus](https://prometheus.io/) monitoring systems and additionally graphed through [Grafana](https://grafana.com/).
|
||||
|
||||
This directory contains both the default `prometheus.yml` config file as well as a basic `codex-grafana-dashboard.json` file that can be augmented with additional panels and metrics on an as needed basis.
|
||||
This directory contains both the default `prometheus.yml` config file as well as a basic `storage-grafana-dashboard.json` file that can be augmented with additional panels and metrics on an as needed basis.
|
||||
|
||||
Additionally, please consider installing the [node_exporter](https://github.com/prometheus/node_exporter) agent to collect machine level metrics such as overall memory, process, networking, disc IO, etc...
|
||||
|
||||
@ -34,7 +34,7 @@ This will take you to the import page.
|
||||
|
||||

|
||||
|
||||
Use either one of the presented methods (upload json, load from a url or copy paste the json into the text-box), to upload the `codex-grafana-dashboard.json` file.
|
||||
Use either one of the presented methods (upload json, load from a url or copy paste the json into the text-box), to upload the `storage-grafana-dashboard.json` file.
|
||||
|
||||
Finally, you'll be presented with the following screen where you can change the name and the `UID` of the imported dashboard. This is only necessary if there is already a dashboard with the same name or `UID`.
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ global:
|
||||
scrape_interval: 12s
|
||||
|
||||
scrape_configs:
|
||||
- job_name: "codex"
|
||||
- job_name: "storage"
|
||||
static_configs:
|
||||
- targets: ['127.0.0.1:8008']
|
||||
- job_name: "node_exporter"
|
||||
|
||||
@ -30,7 +30,7 @@ let
|
||||
in stdenv.mkDerivation rec {
|
||||
pname = "storage";
|
||||
|
||||
version = "${tools.findKeyValue "version = \"([0-9]+\.[0-9]+\.[0-9]+)\"" ../codex.nimble}-${revision}";
|
||||
version = "${tools.findKeyValue "version = \"([0-9]+\.[0-9]+\.[0-9]+)\"" ../storage.nimble}-${revision}";
|
||||
|
||||
inherit src;
|
||||
|
||||
|
||||
@ -17,7 +17,7 @@ in
|
||||
package = mkOption {
|
||||
type = types.package;
|
||||
default = pkgs.callPackage ./default.nix { src = self; inherit circomCompatPkg; };
|
||||
defaultText = literalExpression "pkgs.codex";
|
||||
defaultText = literalExpression "pkgs.storage";
|
||||
description = mdDoc "Package to use as Nim Logos Storage node.";
|
||||
};
|
||||
|
||||
|
||||
@ -171,7 +171,7 @@ components:
|
||||
type: string
|
||||
nullable: true
|
||||
description: "The original name of the uploaded content (optional)"
|
||||
example: codex.png
|
||||
example: storage.png
|
||||
mimetype:
|
||||
type: string
|
||||
nullable: true
|
||||
@ -287,7 +287,7 @@ paths:
|
||||
description: The content disposition used to send the filename.
|
||||
schema:
|
||||
type: string
|
||||
example: 'attachment; filename="codex.png"'
|
||||
example: 'attachment; filename="storage.png"'
|
||||
requestBody:
|
||||
content:
|
||||
application/octet-stream:
|
||||
|
||||
@ -18,36 +18,36 @@ import pkg/confutils/toml/std/uri as confTomlUri
|
||||
import pkg/toml_serialization
|
||||
import pkg/libp2p
|
||||
|
||||
import ./codex/conf
|
||||
import ./codex/codex
|
||||
import ./codex/logutils
|
||||
import ./codex/units
|
||||
import ./codex/utils/keyutils
|
||||
import ./codex/codextypes
|
||||
import ./storage/conf
|
||||
import ./storage/storage
|
||||
import ./storage/logutils
|
||||
import ./storage/units
|
||||
import ./storage/utils/keyutils
|
||||
import ./storage/storagetypes
|
||||
|
||||
export codex, conf, libp2p, chronos, logutils
|
||||
export storage, conf, libp2p, chronos, logutils
|
||||
|
||||
when isMainModule:
|
||||
import std/os
|
||||
import pkg/confutils/defs
|
||||
import ./codex/utils/fileutils
|
||||
import ./storage/utils/fileutils
|
||||
|
||||
logScope:
|
||||
topics = "codex"
|
||||
topics = "storage"
|
||||
|
||||
when defined(posix):
|
||||
import system/ansi_c
|
||||
|
||||
type CodexStatus {.pure.} = enum
|
||||
type StorageStatus {.pure.} = enum
|
||||
Stopped
|
||||
Stopping
|
||||
Running
|
||||
|
||||
let config = CodexConf.load(
|
||||
version = codexFullVersion,
|
||||
let config = StorageConf.load(
|
||||
version = storageFullVersion,
|
||||
envVarsPrefix = "storage",
|
||||
secondarySources = proc(
|
||||
config: CodexConf, sources: auto
|
||||
config: StorageConf, sources: auto
|
||||
) {.gcsafe, raises: [ConfigurationError].} =
|
||||
if configFile =? config.configFile:
|
||||
sources.addConfigFile(Toml, configFile)
|
||||
@ -81,7 +81,7 @@ when isMainModule:
|
||||
trace "Repo dir initialized", dir = config.dataDir / "repo"
|
||||
|
||||
var
|
||||
state: CodexStatus
|
||||
state: StorageStatus
|
||||
shutdown: Future[void]
|
||||
|
||||
let
|
||||
@ -94,7 +94,7 @@ when isMainModule:
|
||||
privateKey = setupKey(keyPath).expect("Should setup private key!")
|
||||
server =
|
||||
try:
|
||||
CodexServer.new(config, privateKey)
|
||||
StorageServer.new(config, privateKey)
|
||||
except Exception as exc:
|
||||
error "Failed to start Logos Storage", msg = exc.msg
|
||||
quit QuitFailure
|
||||
@ -102,7 +102,7 @@ when isMainModule:
|
||||
## Ctrl+C handling
|
||||
proc doShutdown() =
|
||||
shutdown = server.shutdown()
|
||||
state = CodexStatus.Stopping
|
||||
state = StorageStatus.Stopping
|
||||
|
||||
notice "Stopping Logos Storage"
|
||||
|
||||
@ -142,8 +142,8 @@ when isMainModule:
|
||||
# had a chance to start (currently you'll get a SISGSEV if you try to).
|
||||
quit QuitFailure
|
||||
|
||||
state = CodexStatus.Running
|
||||
while state == CodexStatus.Running:
|
||||
state = StorageStatus.Running
|
||||
while state == StorageStatus.Running:
|
||||
try:
|
||||
# poll chronos
|
||||
chronos.poll()
|
||||
@ -28,9 +28,9 @@ import ../../logutils
|
||||
import ../../manifest
|
||||
|
||||
logScope:
|
||||
topics = "codex discoveryengine advertiser"
|
||||
topics = "storage discoveryengine advertiser"
|
||||
|
||||
declareGauge(codex_inflight_advertise, "inflight advertise requests")
|
||||
declareGauge(storage_inflight_advertise, "inflight advertise requests")
|
||||
|
||||
const
|
||||
DefaultConcurrentAdvertRequests = 10
|
||||
@ -106,11 +106,11 @@ proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
|
||||
|
||||
let request = b.discovery.provide(cid)
|
||||
b.inFlightAdvReqs[cid] = request
|
||||
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
|
||||
storage_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
|
||||
|
||||
defer:
|
||||
b.inFlightAdvReqs.del(cid)
|
||||
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
|
||||
storage_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
|
||||
|
||||
await request
|
||||
except CancelledError:
|
||||
@ -31,9 +31,9 @@ import ../../logutils
|
||||
import ../../manifest
|
||||
|
||||
logScope:
|
||||
topics = "codex discoveryengine"
|
||||
topics = "storage discoveryengine"
|
||||
|
||||
declareGauge(codex_inflight_discovery, "inflight discovery requests")
|
||||
declareGauge(storage_inflight_discovery, "inflight discovery requests")
|
||||
|
||||
const
|
||||
DefaultConcurrentDiscRequests = 10
|
||||
@ -114,11 +114,11 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
|
||||
if haves.len < b.minPeersPerBlock:
|
||||
let request = b.discovery.find(cid)
|
||||
b.inFlightDiscReqs[cid] = request
|
||||
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
|
||||
storage_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
|
||||
|
||||
defer:
|
||||
b.inFlightDiscReqs.del(cid)
|
||||
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
|
||||
storage_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
|
||||
|
||||
if (await request.withTimeout(DefaultDiscoveryTimeout)) and
|
||||
peers =? (await request).catch:
|
||||
@ -44,39 +44,41 @@ import ./pendingblocks
|
||||
export peers, pendingblocks, discovery
|
||||
|
||||
logScope:
|
||||
topics = "codex blockexcengine"
|
||||
topics = "storage blockexcengine"
|
||||
|
||||
declareCounter(
|
||||
codex_block_exchange_want_have_lists_sent, "codex blockexchange wantHave lists sent"
|
||||
storage_block_exchange_want_have_lists_sent,
|
||||
"storage blockexchange wantHave lists sent",
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_want_have_lists_received,
|
||||
"codex blockexchange wantHave lists received",
|
||||
storage_block_exchange_want_have_lists_received,
|
||||
"storage blockexchange wantHave lists received",
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_want_block_lists_sent, "codex blockexchange wantBlock lists sent"
|
||||
storage_block_exchange_want_block_lists_sent,
|
||||
"storage blockexchange wantBlock lists sent",
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_want_block_lists_received,
|
||||
"codex blockexchange wantBlock lists received",
|
||||
storage_block_exchange_want_block_lists_received,
|
||||
"storage blockexchange wantBlock lists received",
|
||||
)
|
||||
declareCounter(codex_block_exchange_blocks_sent, "codex blockexchange blocks sent")
|
||||
declareCounter(storage_block_exchange_blocks_sent, "storage blockexchange blocks sent")
|
||||
declareCounter(
|
||||
codex_block_exchange_blocks_received, "codex blockexchange blocks received"
|
||||
storage_block_exchange_blocks_received, "storage blockexchange blocks received"
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_spurious_blocks_received,
|
||||
"codex blockexchange unrequested/duplicate blocks received",
|
||||
storage_block_exchange_spurious_blocks_received,
|
||||
"storage blockexchange unrequested/duplicate blocks received",
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_discovery_requests_total,
|
||||
storage_block_exchange_discovery_requests_total,
|
||||
"Total number of peer discovery requests sent",
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_peer_timeouts_total, "Total number of peer activity timeouts"
|
||||
storage_block_exchange_peer_timeouts_total, "Total number of peer activity timeouts"
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_requests_failed_total,
|
||||
storage_block_exchange_requests_failed_total,
|
||||
"Total number of block requests that failed after exhausting retries",
|
||||
)
|
||||
|
||||
@ -166,7 +168,7 @@ proc sendWantHave(
|
||||
let toAsk = addresses.filterIt(it notin p.peerHave)
|
||||
trace "Sending wantHave request", toAsk, peer = p.id
|
||||
await self.network.request.sendWantList(p.id, toAsk, wantType = WantType.WantHave)
|
||||
codex_block_exchange_want_have_lists_sent.inc()
|
||||
storage_block_exchange_want_have_lists_sent.inc()
|
||||
|
||||
proc sendWantBlock(
|
||||
self: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx
|
||||
@ -175,7 +177,7 @@ proc sendWantBlock(
|
||||
await self.network.request.sendWantList(
|
||||
blockPeer.id, addresses, wantType = WantType.WantBlock
|
||||
) # we want this remote to send us a block
|
||||
codex_block_exchange_want_block_lists_sent.inc()
|
||||
storage_block_exchange_want_block_lists_sent.inc()
|
||||
|
||||
proc sendBatchedWantList(
|
||||
self: BlockExcEngine,
|
||||
@ -297,7 +299,7 @@ proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledErr
|
||||
proc searchForNewPeers(self: BlockExcEngine, cid: Cid) =
|
||||
if self.lastDiscRequest + DiscoveryRateLimit < Moment.now():
|
||||
trace "Searching for new peers for", cid = cid
|
||||
codex_block_exchange_discovery_requests_total.inc()
|
||||
storage_block_exchange_discovery_requests_total.inc()
|
||||
self.lastDiscRequest = Moment.now() # always refresh before calling await!
|
||||
self.discovery.queueFindBlocksReq(@[cid])
|
||||
else:
|
||||
@ -333,7 +335,7 @@ proc downloadInternal(
|
||||
|
||||
if self.pendingBlocks.retriesExhausted(address):
|
||||
trace "Error retries exhausted"
|
||||
codex_block_exchange_requests_failed_total.inc()
|
||||
storage_block_exchange_requests_failed_total.inc()
|
||||
handle.fail(newException(RetriesExhaustedError, "Error retries exhausted"))
|
||||
break
|
||||
|
||||
@ -415,7 +417,7 @@ proc downloadInternal(
|
||||
else:
|
||||
# If the peer timed out, retries immediately.
|
||||
trace "Peer timed out during block request", peer = scheduledPeer.id
|
||||
codex_block_exchange_peer_timeouts_total.inc()
|
||||
storage_block_exchange_peer_timeouts_total.inc()
|
||||
await self.network.dropPeer(scheduledPeer.id)
|
||||
# Evicts peer immediately or we may end up picking it again in the
|
||||
# next retry.
|
||||
@ -426,7 +428,7 @@ proc downloadInternal(
|
||||
await handle.cancelAndWait()
|
||||
except RetriesExhaustedError as exc:
|
||||
warn "Retries exhausted for block", address, exc = exc.msg
|
||||
codex_block_exchange_requests_failed_total.inc()
|
||||
storage_block_exchange_requests_failed_total.inc()
|
||||
if not handle.finished:
|
||||
handle.fail(exc)
|
||||
finally:
|
||||
@ -690,7 +692,7 @@ proc blocksDeliveryHandler*(
|
||||
# Unknown peers and unrequested blocks are dropped with a warning.
|
||||
if not allowSpurious and (peerCtx == nil or not peerCtx.blockReceived(bd.address)):
|
||||
warn "Dropping unrequested or duplicate block received from peer"
|
||||
codex_block_exchange_spurious_blocks_received.inc()
|
||||
storage_block_exchange_spurious_blocks_received.inc()
|
||||
continue
|
||||
|
||||
if err =? self.validateBlockDelivery(bd).errorOption:
|
||||
@ -729,7 +731,7 @@ proc blocksDeliveryHandler*(
|
||||
discard
|
||||
lastIdle = Moment.now()
|
||||
|
||||
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
||||
storage_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
||||
|
||||
if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption:
|
||||
warn "Error resolving blocks", err = err.msg
|
||||
@ -789,11 +791,11 @@ proc wantListHandler*(
|
||||
BlockPresence(address: e.address, `type`: BlockPresenceType.DontHave)
|
||||
)
|
||||
|
||||
codex_block_exchange_want_have_lists_received.inc()
|
||||
storage_block_exchange_want_have_lists_received.inc()
|
||||
of WantType.WantBlock:
|
||||
peerCtx.wantedBlocks.incl(e.address)
|
||||
schedulePeer = true
|
||||
codex_block_exchange_want_block_lists_received.inc()
|
||||
storage_block_exchange_want_block_lists_received.inc()
|
||||
else: # Updating existing entry in peer wants
|
||||
# peer doesn't want this block anymore
|
||||
if e.cancel:
|
||||
@ -902,7 +904,7 @@ proc taskHandler*(
|
||||
continue
|
||||
|
||||
await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries)
|
||||
codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64)
|
||||
storage_block_exchange_blocks_sent.inc(blockDeliveries.len.int64)
|
||||
# Drops the batch from the peer's set of wanted blocks; i.e. assumes that after
|
||||
# we send the blocks, then the peer no longer wants them, so we don't need to
|
||||
# re-send them. Note that the send might still fail down the line and we will
|
||||
@ -22,14 +22,15 @@ import ../../blocktype
|
||||
import ../../logutils
|
||||
|
||||
logScope:
|
||||
topics = "codex pendingblocks"
|
||||
topics = "storage pendingblocks"
|
||||
|
||||
declareGauge(
|
||||
codex_block_exchange_pending_block_requests,
|
||||
"codex blockexchange pending block requests",
|
||||
storage_block_exchange_pending_block_requests,
|
||||
"storage blockexchange pending block requests",
|
||||
)
|
||||
declareGauge(
|
||||
codex_block_exchange_retrieval_time_us, "codex blockexchange block retrieval time us"
|
||||
storage_block_exchange_retrieval_time_us,
|
||||
"storage blockexchange block retrieval time us",
|
||||
)
|
||||
|
||||
const
|
||||
@ -53,7 +54,7 @@ type
|
||||
lastInclusion*: Moment # time at which we last included a block into our wantlist
|
||||
|
||||
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
||||
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
|
||||
storage_block_exchange_pending_block_requests.set(p.blocks.len.int64)
|
||||
|
||||
proc getWantHandle*(
|
||||
self: PendingBlocksManager, address: BlockAddress, requested: ?PeerId = PeerId.none
|
||||
@ -123,7 +124,7 @@ proc resolve*(
|
||||
|
||||
blockReq.handle.complete(bd.blk)
|
||||
|
||||
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
||||
storage_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
||||
else:
|
||||
trace "Block handle already finished", address = bd.address
|
||||
|
||||
@ -27,10 +27,10 @@ import ./networkpeer
|
||||
export networkpeer
|
||||
|
||||
logScope:
|
||||
topics = "codex blockexcnetwork"
|
||||
topics = "storage blockexcnetwork"
|
||||
|
||||
const
|
||||
Codec* = "/codex/blockexc/1.0.0"
|
||||
Codec* = "/storage/blockexc/1.0.0"
|
||||
DefaultMaxInflight* = 100
|
||||
|
||||
type
|
||||
@ -19,7 +19,7 @@ import ../../logutils
|
||||
import ../../utils/trackedfutures
|
||||
|
||||
logScope:
|
||||
topics = "codex blockexcnetworkpeer"
|
||||
topics = "storage blockexcnetworkpeer"
|
||||
|
||||
const DefaultYieldInterval = 50.millis
|
||||
|
||||
@ -25,7 +25,7 @@ import ./peercontext
|
||||
export peercontext
|
||||
|
||||
logScope:
|
||||
topics = "codex peerctxstore"
|
||||
topics = "storage peerctxstore"
|
||||
|
||||
type
|
||||
PeerCtxStore* = ref object of RootObj
|
||||
@ -25,9 +25,9 @@ import ./utils
|
||||
import ./errors
|
||||
import ./logutils
|
||||
import ./utils/json
|
||||
import ./codextypes
|
||||
import ./storagetypes
|
||||
|
||||
export errors, logutils, units, codextypes
|
||||
export errors, logutils, units, storagetypes
|
||||
|
||||
type
|
||||
Block* = ref object of RootObj
|
||||
@ -35,7 +35,7 @@ import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/base64
|
||||
|
||||
import ./codextypes
|
||||
import ./storagetypes
|
||||
import ./discovery
|
||||
import ./logutils
|
||||
import ./stores
|
||||
@ -46,7 +46,7 @@ import ./utils/natutils
|
||||
|
||||
from ./blockexchange/engine/pendingblocks import DefaultBlockRetries
|
||||
|
||||
export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig
|
||||
export units, net, storagetypes, logutils, completeCmdArg, parseCmdArg, NatConfig
|
||||
|
||||
export
|
||||
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval,
|
||||
@ -90,7 +90,7 @@ type
|
||||
repoSQLite = "sqlite"
|
||||
repoLevelDb = "leveldb"
|
||||
|
||||
CodexConf* = object
|
||||
StorageConf* = object
|
||||
configFile* {.
|
||||
desc: "Loads the configuration from a TOML file",
|
||||
defaultValueDesc: "none",
|
||||
@ -277,19 +277,19 @@ type
|
||||
desc: "Logs to file", defaultValue: string.none, name: "log-file", hidden
|
||||
.}: Option[string]
|
||||
|
||||
func defaultAddress*(conf: CodexConf): IpAddress =
|
||||
func defaultAddress*(conf: StorageConf): IpAddress =
|
||||
result = static parseIpAddress("127.0.0.1")
|
||||
|
||||
func defaultNatConfig*(): NatConfig =
|
||||
result = NatConfig(hasExtIp: false, nat: NatStrategy.NatAny)
|
||||
|
||||
proc getCodexVersion(): string =
|
||||
proc getStorageVersion(): string =
|
||||
let tag = strip(staticExec("git describe --tags --abbrev=0"))
|
||||
if tag.isEmptyOrWhitespace:
|
||||
return "untagged build"
|
||||
return tag
|
||||
|
||||
proc getCodexRevision(): string =
|
||||
proc getStorageRevision(): string =
|
||||
# using a slice in a static context breaks nimsuggest for some reason
|
||||
var res = strip(staticExec("git rev-parse --short HEAD"))
|
||||
return res
|
||||
@ -298,12 +298,12 @@ proc getNimBanner(): string =
|
||||
staticExec("nim --version | grep Version")
|
||||
|
||||
const
|
||||
codexVersion* = getCodexVersion()
|
||||
codexRevision* = getCodexRevision()
|
||||
storageVersion* = getStorageVersion()
|
||||
storageRevision* = getStorageRevision()
|
||||
nimBanner* = getNimBanner()
|
||||
|
||||
codexFullVersion* =
|
||||
"Storage version: " & codexVersion & "\p" & "Storage revision: " & codexRevision &
|
||||
storageFullVersion* =
|
||||
"Storage version: " & storageVersion & "\p" & "Storage revision: " & storageRevision &
|
||||
"\p"
|
||||
|
||||
proc parseCmdArg*(
|
||||
@ -533,7 +533,7 @@ proc updateLogLevel*(logLevel: string) {.raises: [ValueError].} =
|
||||
if not setTopicState(topicName, settings.state, settings.logLevel):
|
||||
warn "Unrecognized logging topic", topic = topicName
|
||||
|
||||
proc setupLogging*(conf: CodexConf) =
|
||||
proc setupLogging*(conf: StorageConf) =
|
||||
when defaultChroniclesStream.outputs.type.arity != 3:
|
||||
warn "Logging configuration options not enabled in the current build"
|
||||
else:
|
||||
@ -597,7 +597,7 @@ proc setupLogging*(conf: CodexConf) =
|
||||
else:
|
||||
defaultChroniclesStream.outputs[0].writer = writer
|
||||
|
||||
proc setupMetrics*(config: CodexConf) =
|
||||
proc setupMetrics*(config: StorageConf) =
|
||||
if config.metricsEnabled:
|
||||
let metricsAddress = config.metricsAddress
|
||||
notice "Starting metrics HTTP server",
|
||||
5
storage/contentids_exts.nim
Normal file
5
storage/contentids_exts.nim
Normal file
@ -0,0 +1,5 @@
|
||||
const ContentIdsExts = [
|
||||
multiCodec("storage-root"),
|
||||
multiCodec("storage-manifest"),
|
||||
multiCodec("storage-block"),
|
||||
]
|
||||
@ -32,7 +32,7 @@ export discv5
|
||||
# much more elegantly.
|
||||
|
||||
logScope:
|
||||
topics = "codex discovery"
|
||||
topics = "storage discovery"
|
||||
|
||||
type Discovery* = ref object of RootObj
|
||||
protocol*: discv5.Protocol # dht protocol
|
||||
@ -20,8 +20,8 @@ import pkg/questionable/results
|
||||
export results
|
||||
|
||||
type
|
||||
CodexError* = object of CatchableError # base codex error
|
||||
CodexResult*[T] = Result[T, ref CodexError]
|
||||
StorageError* = object of CatchableError # base Storage error
|
||||
StorageResult*[T] = Result[T, ref StorageError]
|
||||
|
||||
FinishedFailed*[T] = tuple[success: seq[Future[T]], failure: seq[Future[T]]]
|
||||
|
||||
@ -37,7 +37,7 @@ template mapFailure*[T, V, E](
|
||||
)
|
||||
|
||||
template mapFailure*[T, V](exp: Result[T, V]): Result[T, ref CatchableError] =
|
||||
mapFailure(exp, CodexError)
|
||||
mapFailure(exp, StorageError)
|
||||
|
||||
# TODO: using a template here, causes bad codegen
|
||||
func toFailure*[T](exp: Option[T]): Result[T, ref CatchableError] {.inline.} =
|
||||
@ -13,24 +13,24 @@
|
||||
## 6. Remove need to [avoid importing or exporting `toJson`, `%`, `%*` to prevent
|
||||
## conflicts](https://github.com/logos-storage/logos-storage-nim/pull/645#issuecomment-1838834467)
|
||||
##
|
||||
## When declaring a new type, one should consider importing the `codex/logutils`
|
||||
## When declaring a new type, one should consider importing the `storage/logutils`
|
||||
## module, and specifying `formatIt`. If textlines log output and json log output
|
||||
## need to be different, overload `formatIt` and specify a `LogFormat`. If json
|
||||
## serialization is needed, it can be declared with a `%` proc. `logutils`
|
||||
## imports and exports `nim-serde` which handles the de/serialization, examples
|
||||
## below. **Only `codex/logutils` needs to be imported.**
|
||||
## below. **Only `storage/logutils` needs to be imported.**
|
||||
##
|
||||
## Using `logutils` in the Codex codebase:
|
||||
## - Instead of importing `pkg/chronicles`, import `pkg/codex/logutils`
|
||||
## Using `logutils` in the Storage codebase:
|
||||
## - Instead of importing `pkg/chronicles`, import `pkg/storage/logutils`
|
||||
## - most of `chronicles` is exported by `logutils`
|
||||
## - Instead of importing `std/json`, import `pkg/serde/json`
|
||||
## - `std/json` is exported by `serde` which is exported by `logutils`
|
||||
## - Instead of importing `pkg/nim-json-serialization`, import
|
||||
## `pkg/serde/json` or use codex-specific overloads by importing `utils/json`
|
||||
## `pkg/serde/json` or use storage-specific overloads by importing `utils/json`
|
||||
## - one of the goals is to remove the use of `nim-json-serialization`
|
||||
##
|
||||
## ```nim
|
||||
## import pkg/codex/logutils
|
||||
## import pkg/storage/logutils
|
||||
##
|
||||
## type
|
||||
## BlockAddress* = object
|
||||
@ -71,7 +71,7 @@ func mimetype*(self: Manifest): ?string =
|
||||
############################################################
|
||||
|
||||
func isManifest*(cid: Cid): ?!bool =
|
||||
success (ManifestCodec == ?cid.contentType().mapFailure(CodexError))
|
||||
success (ManifestCodec == ?cid.contentType().mapFailure(StorageError))
|
||||
|
||||
func isManifest*(mc: MultiCodec): ?!bool =
|
||||
success mc == ManifestCodec
|
||||
@ -1,4 +1,5 @@
|
||||
import ./merkletree/merkletree
|
||||
import ./merkletree/coders
|
||||
import ./merkletree/coders
|
||||
|
||||
export merkletree, coders
|
||||
@ -22,14 +22,14 @@ import pkg/merkletree
|
||||
import ../utils
|
||||
import ../rng
|
||||
import ../errors
|
||||
import ../codextypes
|
||||
import ../storagetypes
|
||||
|
||||
from ../utils/digest import digestBytes
|
||||
|
||||
export merkletree
|
||||
|
||||
logScope:
|
||||
topics = "codex merkletree"
|
||||
topics = "storage merkletree"
|
||||
|
||||
type
|
||||
ByteTreeKey* {.pure.} = enum
|
||||
@ -50,7 +50,6 @@ type
|
||||
|
||||
func getProof*(self: StorageMerkleTree, index: int): ?!StorageMerkleProof =
|
||||
var proof = StorageMerkleProof(mcodec: self.mcodec)
|
||||
|
||||
?self.getProof(index, proof)
|
||||
|
||||
success proof
|
||||
2
storage/multicodec_exts.nim
Normal file
2
storage/multicodec_exts.nim
Normal file
@ -0,0 +1,2 @@
|
||||
const CodecExts =
|
||||
[("storage-manifest", 0xCD01), ("storage-block", 0xCD02), ("storage-root", 0xCD03)]
|
||||
25
storage/namespaces.nim
Normal file
25
storage/namespaces.nim
Normal file
@ -0,0 +1,25 @@
|
||||
## Logos Storage
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
const
|
||||
# Namespaces
|
||||
StorageMetaNamespace* = "meta" # meta info stored here
|
||||
StorageRepoNamespace* = "repo" # repository namespace, blocks and manifests are subkeys
|
||||
StorageBlockTotalNamespace* = StorageMetaNamespace & "/total"
|
||||
# number of blocks in the repo
|
||||
StorageBlocksNamespace* = StorageRepoNamespace & "/blocks" # blocks namespace
|
||||
StorageManifestNamespace* = StorageRepoNamespace & "/manifests" # manifest namespace
|
||||
StorageBlocksTtlNamespace* = # Cid TTL
|
||||
StorageMetaNamespace & "/ttl"
|
||||
StorageBlockProofNamespace* = # Cid and Proof
|
||||
StorageMetaNamespace & "/proof"
|
||||
StorageDhtNamespace* = "dht" # Dht namespace
|
||||
StorageDhtProvidersNamespace* = # Dht providers namespace
|
||||
StorageDhtNamespace & "/providers"
|
||||
StorageQuotaNamespace* = StorageMetaNamespace & "/quota" # quota's namespace
|
||||
@ -412,7 +412,7 @@ proc nattedAddress*(
|
||||
if ipPart.isSome and port.isSome:
|
||||
# Try to setup NAT mapping for the address
|
||||
let (newIP, tcp, udp) =
|
||||
setupAddress(natConfig, ipPart.get, port.get, udpPort, "codex")
|
||||
setupAddress(natConfig, ipPart.get, port.get, udpPort, "storage")
|
||||
if newIP.isSome:
|
||||
# NAT mapping successful - add discovery address with mapped UDP port
|
||||
discoveryAddrs.add(getMultiAddrWithIPAndUDPPort(newIP.get, udp.get))
|
||||
@ -45,7 +45,7 @@ import ./utils/trackedfutures
|
||||
export logutils
|
||||
|
||||
logScope:
|
||||
topics = "codex node"
|
||||
topics = "storage node"
|
||||
|
||||
const
|
||||
DefaultFetchBatch = 1024
|
||||
@ -53,7 +53,7 @@ const
|
||||
BatchRefillThreshold = 0.75 # Refill when 75% of window completes
|
||||
|
||||
type
|
||||
CodexNode* = object
|
||||
StorageNode* = object
|
||||
switch: Switch
|
||||
networkId: PeerId
|
||||
networkStore: NetworkStore
|
||||
@ -63,27 +63,27 @@ type
|
||||
taskPool: Taskpool
|
||||
trackedFutures: TrackedFutures
|
||||
|
||||
CodexNodeRef* = ref CodexNode
|
||||
StorageNodeRef* = ref StorageNode
|
||||
|
||||
OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].}
|
||||
BatchProc* =
|
||||
proc(blocks: seq[bt.Block]): Future[?!void] {.async: (raises: [CancelledError]).}
|
||||
OnBlockStoredProc = proc(chunk: seq[byte]): void {.gcsafe, raises: [].}
|
||||
|
||||
func switch*(self: CodexNodeRef): Switch =
|
||||
func switch*(self: StorageNodeRef): Switch =
|
||||
return self.switch
|
||||
|
||||
func blockStore*(self: CodexNodeRef): BlockStore =
|
||||
func blockStore*(self: StorageNodeRef): BlockStore =
|
||||
return self.networkStore
|
||||
|
||||
func engine*(self: CodexNodeRef): BlockExcEngine =
|
||||
func engine*(self: StorageNodeRef): BlockExcEngine =
|
||||
return self.engine
|
||||
|
||||
func discovery*(self: CodexNodeRef): Discovery =
|
||||
func discovery*(self: StorageNodeRef): Discovery =
|
||||
return self.discovery
|
||||
|
||||
proc storeManifest*(
|
||||
self: CodexNodeRef, manifest: Manifest
|
||||
self: StorageNodeRef, manifest: Manifest
|
||||
): Future[?!bt.Block] {.async.} =
|
||||
without encodedVerifiable =? manifest.encode(), err:
|
||||
trace "Unable to encode manifest"
|
||||
@ -100,7 +100,7 @@ proc storeManifest*(
|
||||
success blk
|
||||
|
||||
proc fetchManifest*(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
self: StorageNodeRef, cid: Cid
|
||||
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
|
||||
## Fetch and decode a manifest block
|
||||
##
|
||||
@ -124,18 +124,18 @@ proc fetchManifest*(
|
||||
|
||||
return manifest.success
|
||||
|
||||
proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} =
|
||||
## Find peer using the discovery service from the given CodexNode
|
||||
proc findPeer*(self: StorageNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} =
|
||||
## Find peer using the discovery service from the given StorageNode
|
||||
##
|
||||
return await self.discovery.findPeer(peerId)
|
||||
|
||||
proc connect*(
|
||||
self: CodexNodeRef, peerId: PeerId, addrs: seq[MultiAddress]
|
||||
self: StorageNodeRef, peerId: PeerId, addrs: seq[MultiAddress]
|
||||
): Future[void] =
|
||||
self.switch.connect(peerId, addrs)
|
||||
|
||||
proc updateExpiry*(
|
||||
self: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970
|
||||
self: StorageNodeRef, manifestCid: Cid, expiry: SecondsSince1970
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
without manifest =? await self.fetchManifest(manifestCid), error:
|
||||
trace "Unable to fetch manifest for cid", manifestCid
|
||||
@ -158,7 +158,7 @@ proc updateExpiry*(
|
||||
return success()
|
||||
|
||||
proc fetchBatched*(
|
||||
self: CodexNodeRef,
|
||||
self: StorageNodeRef,
|
||||
cid: Cid,
|
||||
iter: Iter[int],
|
||||
batchSize = DefaultFetchBatch,
|
||||
@ -233,7 +233,7 @@ proc fetchBatched*(
|
||||
success()
|
||||
|
||||
proc fetchBatched*(
|
||||
self: CodexNodeRef,
|
||||
self: StorageNodeRef,
|
||||
manifest: Manifest,
|
||||
batchSize = DefaultFetchBatch,
|
||||
onBatch: BatchProc = nil,
|
||||
@ -249,7 +249,7 @@ proc fetchBatched*(
|
||||
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal)
|
||||
|
||||
proc fetchDatasetAsync*(
|
||||
self: CodexNodeRef, manifest: Manifest, fetchLocal = true
|
||||
self: StorageNodeRef, manifest: Manifest, fetchLocal = true
|
||||
): Future[void] {.async: (raises: []).} =
|
||||
## Asynchronously fetch a dataset in the background.
|
||||
## This task will be tracked and cleaned up on node shutdown.
|
||||
@ -264,14 +264,14 @@ proc fetchDatasetAsync*(
|
||||
except CancelledError as exc:
|
||||
trace "Cancelled fetching blocks", exc = exc.msg
|
||||
|
||||
proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) =
|
||||
proc fetchDatasetAsyncTask*(self: StorageNodeRef, manifest: Manifest) =
|
||||
## Start fetching a dataset in the background.
|
||||
## The task will be tracked and cleaned up on node shutdown.
|
||||
##
|
||||
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))
|
||||
|
||||
proc streamSingleBlock(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
self: StorageNodeRef, cid: Cid
|
||||
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
|
||||
## Streams the contents of a single block.
|
||||
##
|
||||
@ -296,7 +296,7 @@ proc streamSingleBlock(
|
||||
LPStream(stream).success
|
||||
|
||||
proc streamEntireDataset(
|
||||
self: CodexNodeRef, manifest: Manifest, manifestCid: Cid
|
||||
self: StorageNodeRef, manifest: Manifest, manifestCid: Cid
|
||||
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
|
||||
## Streams the contents of the entire dataset described by the manifest.
|
||||
##
|
||||
@ -324,7 +324,7 @@ proc streamEntireDataset(
|
||||
stream.success
|
||||
|
||||
proc retrieve*(
|
||||
self: CodexNodeRef, cid: Cid, local: bool = true
|
||||
self: StorageNodeRef, cid: Cid, local: bool = true
|
||||
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
|
||||
## Retrieve by Cid a single block or an entire dataset described by manifest
|
||||
##
|
||||
@ -340,7 +340,7 @@ proc retrieve*(
|
||||
|
||||
await self.streamEntireDataset(manifest, cid)
|
||||
|
||||
proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
proc deleteSingleBlock(self: StorageNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
if err =? (await self.networkStore.delBlock(cid)).errorOption:
|
||||
error "Error deleting block", cid, err = err.msg
|
||||
return failure(err)
|
||||
@ -348,7 +348,7 @@ proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
trace "Deleted block", cid
|
||||
return success()
|
||||
|
||||
proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
proc deleteEntireDataset(self: StorageNodeRef, cid: Cid): Future[?!void] {.async.} =
|
||||
# Deletion is a strictly local operation
|
||||
var store = self.networkStore.localStore
|
||||
|
||||
@ -382,7 +382,7 @@ proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.}
|
||||
success()
|
||||
|
||||
proc delete*(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
self: StorageNodeRef, cid: Cid
|
||||
): Future[?!void] {.async: (raises: [CatchableError]).} =
|
||||
## Deletes a whole dataset, if Cid is a Manifest Cid, or a single block, if Cid a block Cid,
|
||||
## from the underlying block store. This is a strictly local operation.
|
||||
@ -400,7 +400,7 @@ proc delete*(
|
||||
await self.deleteEntireDataset(cid)
|
||||
|
||||
proc store*(
|
||||
self: CodexNodeRef,
|
||||
self: StorageNodeRef,
|
||||
stream: LPStream,
|
||||
filename: ?string = string.none,
|
||||
mimetype: ?string = string.none,
|
||||
@ -484,7 +484,7 @@ proc store*(
|
||||
|
||||
return manifestBlk.cid.success
|
||||
|
||||
proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||
proc iterateManifests*(self: StorageNodeRef, onManifest: OnManifest) {.async.} =
|
||||
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
|
||||
warn "Failed to listBlocks"
|
||||
return
|
||||
@ -502,11 +502,11 @@ proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
|
||||
onManifest(cid, manifest)
|
||||
|
||||
proc onExpiryUpdate(
|
||||
self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970
|
||||
self: StorageNodeRef, rootCid: Cid, expiry: SecondsSince1970
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
return await self.updateExpiry(rootCid, expiry)
|
||||
|
||||
proc start*(self: CodexNodeRef) {.async.} =
|
||||
proc start*(self: StorageNodeRef) {.async.} =
|
||||
if not self.engine.isNil:
|
||||
await self.engine.start()
|
||||
|
||||
@ -519,7 +519,7 @@ proc start*(self: CodexNodeRef) {.async.} =
|
||||
self.networkId = self.switch.peerInfo.peerId
|
||||
notice "Started Storage node", id = self.networkId, addrs = self.switch.peerInfo.addrs
|
||||
|
||||
proc stop*(self: CodexNodeRef) {.async.} =
|
||||
proc stop*(self: StorageNodeRef) {.async.} =
|
||||
trace "Stopping node"
|
||||
|
||||
await self.trackedFutures.cancelTracked()
|
||||
@ -533,22 +533,22 @@ proc stop*(self: CodexNodeRef) {.async.} =
|
||||
if not self.clock.isNil:
|
||||
await self.clock.stop()
|
||||
|
||||
proc close*(self: CodexNodeRef) {.async.} =
|
||||
proc close*(self: StorageNodeRef) {.async.} =
|
||||
if not self.networkStore.isNil:
|
||||
await self.networkStore.close
|
||||
|
||||
proc new*(
|
||||
T: type CodexNodeRef,
|
||||
T: type StorageNodeRef,
|
||||
switch: Switch,
|
||||
networkStore: NetworkStore,
|
||||
engine: BlockExcEngine,
|
||||
discovery: Discovery,
|
||||
taskpool: Taskpool,
|
||||
): CodexNodeRef =
|
||||
## Create new instance of a Codex self, call `start` to run it
|
||||
): StorageNodeRef =
|
||||
## Create new instance of a Storage self, call `start` to run it
|
||||
##
|
||||
|
||||
CodexNodeRef(
|
||||
StorageNodeRef(
|
||||
switch: switch,
|
||||
networkStore: networkStore,
|
||||
engine: engine,
|
||||
@ -558,7 +558,7 @@ proc new*(
|
||||
)
|
||||
|
||||
proc hasLocalBlock*(
|
||||
self: CodexNodeRef, cid: Cid
|
||||
self: StorageNodeRef, cid: Cid
|
||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
## Returns true if the given Cid is present in the local store
|
||||
|
||||
@ -39,10 +39,10 @@ import ./coders
|
||||
import ./json
|
||||
|
||||
logScope:
|
||||
topics = "codex restapi"
|
||||
topics = "storage restapi"
|
||||
|
||||
declareCounter(codex_api_uploads, "codex API uploads")
|
||||
declareCounter(codex_api_downloads, "codex API downloads")
|
||||
declareCounter(storage_api_uploads, "storage API uploads")
|
||||
declareCounter(storage_api_downloads, "storage API downloads")
|
||||
|
||||
proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} =
|
||||
0
|
||||
@ -50,7 +50,7 @@ proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].}
|
||||
proc formatManifest(cid: Cid, manifest: Manifest): RestContent =
|
||||
return RestContent.init(cid, manifest)
|
||||
|
||||
proc formatManifestBlocks(node: CodexNodeRef): Future[JsonNode] {.async.} =
|
||||
proc formatManifestBlocks(node: StorageNodeRef): Future[JsonNode] {.async.} =
|
||||
var content: seq[RestContent]
|
||||
|
||||
proc addManifest(cid: Cid, manifest: Manifest) =
|
||||
@ -67,7 +67,7 @@ proc isPending(resp: HttpResponseRef): bool =
|
||||
return resp.getResponseState() == HttpResponseState.Empty
|
||||
|
||||
proc retrieveCid(
|
||||
node: CodexNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef
|
||||
node: StorageNodeRef, cid: Cid, local: bool = true, resp: HttpResponseRef
|
||||
): Future[void] {.async: (raises: [CancelledError, HttpWriteError]).} =
|
||||
## Download a file from the node in a streaming
|
||||
## manner
|
||||
@ -131,7 +131,7 @@ proc retrieveCid(
|
||||
|
||||
await resp.send(addr buff[0], buff.len)
|
||||
await resp.finish()
|
||||
codex_api_downloads.inc()
|
||||
storage_api_downloads.inc()
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except LPStreamError as exc:
|
||||
@ -173,7 +173,7 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string =
|
||||
let filename = parts[1].strip()
|
||||
return filename[0 ..^ 2].some
|
||||
|
||||
proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
|
||||
proc initDataApi(node: StorageNodeRef, repoStore: RepoStore, router: var RestRouter) =
|
||||
let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion
|
||||
|
||||
router.api(MethodOptions, "/api/storage/v1/data") do(
|
||||
@ -238,7 +238,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
error "Error uploading file", exc = error.msg
|
||||
return RestApiResponse.error(Http500, error.msg)
|
||||
|
||||
codex_api_uploads.inc()
|
||||
storage_api_uploads.inc()
|
||||
trace "Uploaded file", cid
|
||||
return RestApiResponse.response($cid)
|
||||
except CancelledError:
|
||||
@ -383,7 +383,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
|
||||
)
|
||||
return RestApiResponse.response($json, contentType = "application/json")
|
||||
|
||||
proc initNodeApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
||||
proc initNodeApi(node: StorageNodeRef, conf: StorageConf, router: var RestRouter) =
|
||||
let allowedOrigin = router.allowedOrigin
|
||||
|
||||
## various node management api's
|
||||
@ -465,7 +465,7 @@ proc initNodeApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
||||
return
|
||||
RestApiResponse.error(Http500, "Unknown error dialling peer", headers = headers)
|
||||
|
||||
proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
||||
proc initDebugApi(node: StorageNodeRef, conf: StorageConf, router: var RestRouter) =
|
||||
let allowedOrigin = router.allowedOrigin
|
||||
|
||||
router.api(MethodGet, "/api/storage/v1/debug/info") do() -> RestApiResponse:
|
||||
@ -484,7 +484,7 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
||||
if node.discovery.dhtRecord.isSome: node.discovery.dhtRecord.get.toURI else: "",
|
||||
"announceAddresses": node.discovery.announceAddrs,
|
||||
"table": table,
|
||||
"storage": {"version": $codexVersion, "revision": $codexRevision},
|
||||
"storage": {"version": $storageVersion, "revision": $storageRevision},
|
||||
}
|
||||
|
||||
# return pretty json for human readability
|
||||
@ -542,8 +542,8 @@ proc initDebugApi(node: CodexNodeRef, conf: CodexConf, router: var RestRouter) =
|
||||
return RestApiResponse.error(Http500, headers = headers)
|
||||
|
||||
proc initRestApi*(
|
||||
node: CodexNodeRef,
|
||||
conf: CodexConf,
|
||||
node: StorageNodeRef,
|
||||
conf: StorageConf,
|
||||
repoStore: RepoStore,
|
||||
corsAllowedOrigin: ?string,
|
||||
): RestRouter =
|
||||
@ -35,35 +35,35 @@ import ./discovery
|
||||
import ./systemclock
|
||||
import ./utils/addrutils
|
||||
import ./namespaces
|
||||
import ./codextypes
|
||||
import ./storagetypes
|
||||
import ./logutils
|
||||
import ./nat
|
||||
|
||||
logScope:
|
||||
topics = "codex node"
|
||||
topics = "storage node"
|
||||
|
||||
type
|
||||
CodexServer* = ref object
|
||||
config: CodexConf
|
||||
StorageServer* = ref object
|
||||
config: StorageConf
|
||||
restServer: RestServerRef
|
||||
codexNode: CodexNodeRef
|
||||
storageNode: StorageNodeRef
|
||||
repoStore: RepoStore
|
||||
maintenance: BlockMaintainer
|
||||
taskpool: Taskpool
|
||||
isStarted: bool
|
||||
|
||||
CodexPrivateKey* = libp2p.PrivateKey # alias
|
||||
StoragePrivateKey* = libp2p.PrivateKey # alias
|
||||
|
||||
func config*(self: CodexServer): CodexConf =
|
||||
func config*(self: StorageServer): StorageConf =
|
||||
return self.config
|
||||
|
||||
func node*(self: CodexServer): CodexNodeRef =
|
||||
return self.codexNode
|
||||
func node*(self: StorageServer): StorageNodeRef =
|
||||
return self.storageNode
|
||||
|
||||
func repoStore*(self: CodexServer): RepoStore =
|
||||
func repoStore*(self: StorageServer): RepoStore =
|
||||
return self.repoStore
|
||||
|
||||
proc start*(s: CodexServer) {.async.} =
|
||||
proc start*(s: StorageServer) {.async.} =
|
||||
if s.isStarted:
|
||||
warn "Storage server already started, skipping"
|
||||
return
|
||||
@ -73,23 +73,23 @@ proc start*(s: CodexServer) {.async.} =
|
||||
|
||||
s.maintenance.start()
|
||||
|
||||
await s.codexNode.switch.start()
|
||||
await s.storageNode.switch.start()
|
||||
|
||||
let (announceAddrs, discoveryAddrs) = nattedAddress(
|
||||
s.config.nat, s.codexNode.switch.peerInfo.addrs, s.config.discoveryPort
|
||||
s.config.nat, s.storageNode.switch.peerInfo.addrs, s.config.discoveryPort
|
||||
)
|
||||
|
||||
s.codexNode.discovery.updateAnnounceRecord(announceAddrs)
|
||||
s.codexNode.discovery.updateDhtRecord(discoveryAddrs)
|
||||
s.storageNode.discovery.updateAnnounceRecord(announceAddrs)
|
||||
s.storageNode.discovery.updateDhtRecord(discoveryAddrs)
|
||||
|
||||
await s.codexNode.start()
|
||||
await s.storageNode.start()
|
||||
|
||||
if s.restServer != nil:
|
||||
s.restServer.start()
|
||||
|
||||
s.isStarted = true
|
||||
|
||||
proc stop*(s: CodexServer) {.async.} =
|
||||
proc stop*(s: StorageServer) {.async.} =
|
||||
if not s.isStarted:
|
||||
warn "Storage is not started"
|
||||
return
|
||||
@ -97,8 +97,8 @@ proc stop*(s: CodexServer) {.async.} =
|
||||
notice "Stopping Storage node"
|
||||
|
||||
var futures = @[
|
||||
s.codexNode.switch.stop(),
|
||||
s.codexNode.stop(),
|
||||
s.storageNode.switch.stop(),
|
||||
s.storageNode.stop(),
|
||||
s.repoStore.stop(),
|
||||
s.maintenance.stop(),
|
||||
]
|
||||
@ -114,9 +114,9 @@ proc stop*(s: CodexServer) {.async.} =
|
||||
error "Failed to stop Storage node", failures = res.failure.len
|
||||
raiseAssert "Failed to stop Storage node"
|
||||
|
||||
proc close*(s: CodexServer) {.async.} =
|
||||
proc close*(s: StorageServer) {.async.} =
|
||||
var futures =
|
||||
@[s.codexNode.close(), s.repoStore.close(), s.codexNode.discovery.close()]
|
||||
@[s.storageNode.close(), s.repoStore.close(), s.storageNode.discovery.close()]
|
||||
|
||||
let res = await noCancel allFinishedFailed[void](futures)
|
||||
|
||||
@ -131,14 +131,14 @@ proc close*(s: CodexServer) {.async.} =
|
||||
error "Failed to close Storage node", failures = res.failure.len
|
||||
raiseAssert "Failed to close Storage node"
|
||||
|
||||
proc shutdown*(server: CodexServer) {.async.} =
|
||||
proc shutdown*(server: StorageServer) {.async.} =
|
||||
await server.stop()
|
||||
await server.close()
|
||||
|
||||
proc new*(
|
||||
T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey
|
||||
): CodexServer =
|
||||
## create CodexServer including setting up datastore, repostore, etc
|
||||
T: type StorageServer, config: StorageConf, privateKey: StoragePrivateKey
|
||||
): StorageServer =
|
||||
## create StorageServer including setting up datastore, repostore, etc
|
||||
let switch = SwitchBuilder
|
||||
.new()
|
||||
.withPrivateKey(privateKey)
|
||||
@ -169,7 +169,7 @@ proc new*(
|
||||
cache = CacheStore.new(cacheSize = config.cacheSize)
|
||||
## Is unused?
|
||||
|
||||
let discoveryDir = config.dataDir / CodexDhtNamespace
|
||||
let discoveryDir = config.dataDir / StorageDhtNamespace
|
||||
|
||||
if io2.createPath(discoveryDir).isErr:
|
||||
trace "Unable to create discovery directory for block store",
|
||||
@ -178,7 +178,7 @@ proc new*(
|
||||
msg: "Unable to create discovery directory for block store: " & discoveryDir
|
||||
)
|
||||
|
||||
let providersPath = config.dataDir / CodexDhtProvidersNamespace
|
||||
let providersPath = config.dataDir / StorageDhtProvidersNamespace
|
||||
let discoveryStoreRes = LevelDbDatastore.new(providersPath)
|
||||
if discoveryStoreRes.isErr:
|
||||
error "Failed to initialize discovery datastore",
|
||||
@ -221,7 +221,7 @@ proc new*(
|
||||
|
||||
repoStore = RepoStore.new(
|
||||
repoDs = repoData,
|
||||
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace).expect(
|
||||
metaDs = LevelDbDatastore.new(config.dataDir / StorageMetaNamespace).expect(
|
||||
"Should create metadata store!"
|
||||
),
|
||||
quotaMaxBytes = config.storageQuota,
|
||||
@ -244,7 +244,7 @@ proc new*(
|
||||
)
|
||||
store = NetworkStore.new(engine, repoStore)
|
||||
|
||||
codexNode = CodexNodeRef.new(
|
||||
storageNode = StorageNodeRef.new(
|
||||
switch = switch,
|
||||
networkStore = store,
|
||||
engine = engine,
|
||||
@ -257,7 +257,7 @@ proc new*(
|
||||
if config.apiBindAddress.isSome:
|
||||
restServer = RestServerRef
|
||||
.new(
|
||||
codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin),
|
||||
storageNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin),
|
||||
initTAddress(config.apiBindAddress.get(), config.apiPort),
|
||||
bufferSize = (1024 * 64),
|
||||
maxRequestBodySize = int.high,
|
||||
@ -266,9 +266,9 @@ proc new*(
|
||||
|
||||
switch.mount(network)
|
||||
|
||||
CodexServer(
|
||||
StorageServer(
|
||||
config: config,
|
||||
codexNode: codexNode,
|
||||
storageNode: storageNode,
|
||||
restServer: restServer,
|
||||
repoStore: repoStore,
|
||||
maintenance: maintenance,
|
||||
@ -30,11 +30,11 @@ const
|
||||
# hashes
|
||||
Sha256HashCodec* = multiCodec("sha2-256")
|
||||
|
||||
ManifestCodec* = multiCodec("codex-manifest")
|
||||
DatasetRootCodec* = multiCodec("codex-root")
|
||||
BlockCodec* = multiCodec("codex-block")
|
||||
ManifestCodec* = multiCodec("storage-manifest")
|
||||
DatasetRootCodec* = multiCodec("storage-root")
|
||||
BlockCodec* = multiCodec("storage-block")
|
||||
|
||||
CodexPrimitivesCodecs* = [ManifestCodec, DatasetRootCodec, BlockCodec]
|
||||
StoragePrimitivesCodecs* = [ManifestCodec, DatasetRootCodec, BlockCodec]
|
||||
|
||||
proc initEmptyCidTable(): ?!Table[(CidVersion, MultiCodec, MultiCodec), Cid] =
|
||||
## Initialize padding blocks table
|
||||
@ -22,7 +22,7 @@ import ../utils
|
||||
export blocktype
|
||||
|
||||
type
|
||||
BlockNotFoundError* = object of CodexError
|
||||
BlockNotFoundError* = object of StorageError
|
||||
|
||||
BlockType* {.pure.} = enum
|
||||
Manifest
|
||||
@ -30,7 +30,7 @@ import ../clock
|
||||
export blockstore
|
||||
|
||||
logScope:
|
||||
topics = "codex cachestore"
|
||||
topics = "storage cachestore"
|
||||
|
||||
type
|
||||
CacheStore* = ref object of BlockStore
|
||||
@ -39,7 +39,7 @@ type
|
||||
cache: LruCache[Cid, Block]
|
||||
cidAndProofCache: LruCache[(Cid, Natural), (Cid, StorageMerkleProof)]
|
||||
|
||||
InvalidBlockSize* = object of CodexError
|
||||
InvalidBlockSize* = object of StorageError
|
||||
|
||||
const DefaultCacheSize*: NBytes = 5.MiBs
|
||||
|
||||
@ -17,14 +17,14 @@ import ../namespaces
|
||||
import ../manifest
|
||||
|
||||
const
|
||||
CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet
|
||||
CodexRepoKey* = Key.init(CodexRepoNamespace).tryGet
|
||||
CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet
|
||||
CodexTotalBlocksKey* = Key.init(CodexBlockTotalNamespace).tryGet
|
||||
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
|
||||
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
|
||||
BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet
|
||||
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
|
||||
StorageMetaKey* = Key.init(StorageMetaNamespace).tryGet
|
||||
StorageRepoKey* = Key.init(StorageRepoNamespace).tryGet
|
||||
StorageBlocksKey* = Key.init(StorageBlocksNamespace).tryGet
|
||||
StorageTotalBlocksKey* = Key.init(StorageBlockTotalNamespace).tryGet
|
||||
StorageManifestKey* = Key.init(StorageManifestNamespace).tryGet
|
||||
BlocksTtlKey* = Key.init(StorageBlocksTtlNamespace).tryGet
|
||||
BlockProofKey* = Key.init(StorageBlockProofNamespace).tryGet
|
||||
QuotaKey* = Key.init(StorageQuotaNamespace).tryGet
|
||||
QuotaUsedKey* = (QuotaKey / "used").tryGet
|
||||
QuotaReservedKey* = (QuotaKey / "reserved").tryGet
|
||||
|
||||
@ -32,9 +32,9 @@ func makePrefixKey*(postFixLen: int, cid: Cid): ?!Key =
|
||||
let cidKey = ?Key.init(($cid)[^postFixLen ..^ 1] & "/" & $cid)
|
||||
|
||||
if ?cid.isManifest:
|
||||
success CodexManifestKey / cidKey
|
||||
success StorageManifestKey / cidKey
|
||||
else:
|
||||
success CodexBlocksKey / cidKey
|
||||
success StorageBlocksKey / cidKey
|
||||
|
||||
proc createBlockExpirationMetadataKey*(cid: Cid): ?!Key =
|
||||
BlocksTtlKey / $cid
|
||||
@ -24,7 +24,7 @@ import ../logutils
|
||||
import ../systemclock
|
||||
|
||||
logScope:
|
||||
topics = "codex maintenance"
|
||||
topics = "storage maintenance"
|
||||
|
||||
const
|
||||
DefaultBlockInterval* = 10.minutes
|
||||
@ -25,7 +25,7 @@ import ./blockstore
|
||||
export blockstore, blockexchange, asyncheapqueue
|
||||
|
||||
logScope:
|
||||
topics = "codex networkstore"
|
||||
topics = "storage networkstore"
|
||||
|
||||
type NetworkStore* = ref object of BlockStore
|
||||
engine*: BlockExcEngine # blockexc decision engine
|
||||
@ -26,11 +26,11 @@ import ../../logutils
|
||||
import ../../merkletree
|
||||
|
||||
logScope:
|
||||
topics = "codex repostore"
|
||||
topics = "storage repostore"
|
||||
|
||||
declareGauge(codex_repostore_blocks, "codex repostore blocks")
|
||||
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
|
||||
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
|
||||
declareGauge(storage_repostore_blocks, "storage repostore blocks")
|
||||
declareGauge(storage_repostore_bytes_used, "storage repostore bytes used")
|
||||
declareGauge(storage_repostore_bytes_reserved, "storage repostore bytes reserved")
|
||||
|
||||
proc putLeafMetadata*(
|
||||
self: RepoStore,
|
||||
@ -90,7 +90,7 @@ proc updateTotalBlocksCount*(
|
||||
self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0
|
||||
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
||||
await self.metaDs.modify(
|
||||
CodexTotalBlocksKey,
|
||||
StorageTotalBlocksKey,
|
||||
proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
|
||||
let count: Natural =
|
||||
if currCount =? maybeCurrCount:
|
||||
@ -99,7 +99,7 @@ proc updateTotalBlocksCount*(
|
||||
plusCount - minusCount
|
||||
|
||||
self.totalBlocks = count
|
||||
codex_repostore_blocks.set(count.int64)
|
||||
storage_repostore_blocks.set(count.int64)
|
||||
count.some,
|
||||
)
|
||||
|
||||
@ -132,8 +132,8 @@ proc updateQuotaUsage*(
|
||||
)
|
||||
else:
|
||||
self.quotaUsage = usage
|
||||
codex_repostore_bytes_used.set(usage.used.int64)
|
||||
codex_repostore_bytes_reserved.set(usage.reserved.int64)
|
||||
storage_repostore_bytes_used.set(usage.used.int64)
|
||||
storage_repostore_bytes_reserved.set(usage.reserved.int64)
|
||||
return usage.some,
|
||||
)
|
||||
|
||||
@ -32,7 +32,7 @@ import ../../utils
|
||||
export blocktype, cid
|
||||
|
||||
logScope:
|
||||
topics = "codex repostore"
|
||||
topics = "storage repostore"
|
||||
|
||||
###########################################################
|
||||
# BlockStore API
|
||||
@ -323,9 +323,9 @@ method listBlocks*(
|
||||
|
||||
let key =
|
||||
case blockType
|
||||
of BlockType.Manifest: CodexManifestKey
|
||||
of BlockType.Block: CodexBlocksKey
|
||||
of BlockType.Both: CodexRepoKey
|
||||
of BlockType.Manifest: StorageManifestKey
|
||||
of BlockType.Block: StorageBlocksKey
|
||||
of BlockType.Both: StorageRepoKey
|
||||
|
||||
let query = Query.init(key, value = false)
|
||||
without queryIter =? (await self.repoDs.query(query)), err:
|
||||
@ -444,7 +444,7 @@ proc release*(
|
||||
|
||||
proc start*(
|
||||
self: RepoStore
|
||||
): Future[void] {.async: (raises: [CancelledError, CodexError]).} =
|
||||
): Future[void] {.async: (raises: [CancelledError, StorageError]).} =
|
||||
## Start repo
|
||||
##
|
||||
if self.started:
|
||||
@ -453,10 +453,10 @@ proc start*(
|
||||
|
||||
trace "Starting rep"
|
||||
if err =? (await self.updateTotalBlocksCount()).errorOption:
|
||||
raise newException(CodexError, err.msg)
|
||||
raise newException(StorageError, err.msg)
|
||||
|
||||
if err =? (await self.updateQuotaUsage()).errorOption:
|
||||
raise newException(CodexError, err.msg)
|
||||
raise newException(StorageError, err.msg)
|
||||
|
||||
self.started = true
|
||||
|
||||
@ -25,7 +25,7 @@ const
|
||||
DefaultQuotaBytes* = 20.GiBs
|
||||
|
||||
type
|
||||
QuotaNotEnoughError* = object of CodexError
|
||||
QuotaNotEnoughError* = object of StorageError
|
||||
|
||||
RepoStore* = ref object of BlockStore
|
||||
postFixLen*: int
|
||||
@ -15,7 +15,7 @@ import ../logutils
|
||||
export lpstream, chronos, logutils
|
||||
|
||||
logScope:
|
||||
topics = "codex seekablestream"
|
||||
topics = "storage seekablestream"
|
||||
|
||||
type SeekableStream* = ref object of LPStream
|
||||
offset*: int
|
||||
@ -25,7 +25,7 @@ import ./seekablestream
|
||||
export stores, blocktype, manifest, chronos
|
||||
|
||||
logScope:
|
||||
topics = "codex storestream"
|
||||
topics = "storage storestream"
|
||||
|
||||
const StoreStreamTrackerName* = "StoreStream"
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user