mirror of
https://github.com/status-im/nim-dagger.git
synced 2025-01-22 20:50:09 +00:00
Plumbing in conf types (#472)
Goal is to provide documentation and to enable conf utils byte params. * Create byte units NByte and plumb through in size locations. * add json serde * plumb parseDuration to conf
This commit is contained in:
parent
f053135f68
commit
711e5e09d1
@ -21,6 +21,7 @@ import pkg/libp2p
|
||||
|
||||
import ./codex/conf
|
||||
import ./codex/codex
|
||||
import ./codex/units
|
||||
import ./codex/utils/keyutils
|
||||
|
||||
export codex, conf, libp2p, chronos, chronicles
|
||||
|
@ -20,15 +20,17 @@ import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/chronicles
|
||||
|
||||
import ./units
|
||||
import ./utils
|
||||
import ./formats
|
||||
import ./errors
|
||||
|
||||
export errors, formats
|
||||
export errors, formats, units
|
||||
|
||||
const
|
||||
# Size of blocks for storage / network exchange,
|
||||
# should be divisible by 31 for PoR and by 64 for Leopard ECC
|
||||
BlockSize* = 31 * 64 * 33
|
||||
DefaultBlockSize* = NBytes 31 * 64 * 33
|
||||
|
||||
type
|
||||
Block* = ref object of RootObj
|
||||
|
@ -24,7 +24,7 @@ import ./blocktype
|
||||
export blocktype
|
||||
|
||||
const
|
||||
DefaultChunkSize* = BlockSize
|
||||
DefaultChunkSize* = DefaultBlockSize
|
||||
|
||||
type
|
||||
# default reader type
|
||||
@ -35,7 +35,7 @@ type
|
||||
Chunker* = ref object
|
||||
reader*: Reader # Procedure called to actually read the data
|
||||
offset*: int # Bytes read so far (position in the stream)
|
||||
chunkSize*: Natural # Size of each chunk
|
||||
chunkSize*: NBytes # Size of each chunk
|
||||
pad*: bool # Pad last chunk to chunkSize?
|
||||
|
||||
FileChunker* = Chunker
|
||||
@ -46,7 +46,7 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} =
|
||||
## the instantiated chunker
|
||||
##
|
||||
|
||||
var buff = newSeq[byte](c.chunkSize)
|
||||
var buff = newSeq[byte](c.chunkSize.int)
|
||||
let read = await c.reader(cast[ChunkBuffer](addr buff[0]), buff.len)
|
||||
|
||||
if read <= 0:
|
||||
@ -59,7 +59,7 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} =
|
||||
|
||||
return move buff
|
||||
|
||||
func new*(
|
||||
proc new*(
|
||||
T: type Chunker,
|
||||
reader: Reader,
|
||||
chunkSize = DefaultChunkSize,
|
||||
|
@ -176,8 +176,8 @@ proc new*(
|
||||
var
|
||||
cache: CacheStore = nil
|
||||
|
||||
if config.cacheSize > 0:
|
||||
cache = CacheStore.new(cacheSize = config.cacheSize * MiB)
|
||||
if config.cacheSize > 0'nb:
|
||||
cache = CacheStore.new(cacheSize = config.cacheSize)
|
||||
## Is unused?
|
||||
|
||||
let
|
||||
@ -215,11 +215,11 @@ proc new*(
|
||||
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
|
||||
.expect("Should create meta data store!"),
|
||||
quotaMaxBytes = config.storageQuota.uint,
|
||||
blockTtl = config.blockTtlSeconds.seconds)
|
||||
blockTtl = config.blockTtl)
|
||||
|
||||
maintenance = BlockMaintainer.new(
|
||||
repoStore,
|
||||
interval = config.blockMaintenanceIntervalSeconds.seconds,
|
||||
interval = config.blockMaintenanceInterval,
|
||||
numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks)
|
||||
|
||||
peerStore = PeerCtxStore.new()
|
||||
|
@ -27,13 +27,17 @@ import pkg/toml_serialization
|
||||
import pkg/metrics
|
||||
import pkg/metrics/chronos_httpserver
|
||||
import pkg/stew/shims/net as stewnet
|
||||
import pkg/stew/shims/parseutils
|
||||
import pkg/libp2p
|
||||
import pkg/ethers
|
||||
|
||||
import ./discovery
|
||||
import ./stores
|
||||
import ./units
|
||||
import ./utils
|
||||
|
||||
export DefaultCacheSizeMiB, net, DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, DefaultNumberOfBlocksToMaintainPerInterval
|
||||
export units
|
||||
export net, DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, DefaultNumberOfBlocksToMaintainPerInterval
|
||||
|
||||
const
|
||||
codex_enable_api_debug_peers* {.booldefine.} = false
|
||||
@ -161,10 +165,10 @@ type
|
||||
|
||||
apiPort* {.
|
||||
desc: "The REST Api port",
|
||||
defaultValue: 8080
|
||||
defaultValue: 8080.Port
|
||||
defaultValueDesc: "8080"
|
||||
name: "api-port"
|
||||
abbr: "p" }: int
|
||||
abbr: "p" }: Port
|
||||
|
||||
repoKind* {.
|
||||
desc: "backend for main repo store (fs, sqlite)"
|
||||
@ -177,20 +181,20 @@ type
|
||||
defaultValue: DefaultQuotaBytes
|
||||
defaultValueDesc: $DefaultQuotaBytes
|
||||
name: "storage-quota"
|
||||
abbr: "q" }: Natural
|
||||
abbr: "q" }: NBytes
|
||||
|
||||
blockTtlSeconds* {.
|
||||
blockTtl* {.
|
||||
desc: "Default block timeout in seconds - 0 disables the ttl"
|
||||
defaultValue: DefaultBlockTtl.seconds
|
||||
defaultValue: DefaultBlockTtl
|
||||
defaultValueDesc: $DefaultBlockTtl
|
||||
name: "block-ttl"
|
||||
abbr: "t" }: int
|
||||
abbr: "t" }: Duration
|
||||
|
||||
blockMaintenanceIntervalSeconds* {.
|
||||
blockMaintenanceInterval* {.
|
||||
desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup."
|
||||
defaultValue: DefaultBlockMaintenanceInterval.seconds
|
||||
defaultValue: DefaultBlockMaintenanceInterval
|
||||
defaultValueDesc: $DefaultBlockMaintenanceInterval
|
||||
name: "block-mi" }: int
|
||||
name: "block-mi" }: Duration
|
||||
|
||||
blockMaintenanceNumberOfBlocks* {.
|
||||
desc: "Number of blocks to check every maintenance cycle."
|
||||
@ -199,11 +203,11 @@ type
|
||||
name: "block-mn" }: int
|
||||
|
||||
cacheSize* {.
|
||||
desc: "The size in MiB of the block cache, 0 disables the cache - might help on slow hardrives"
|
||||
desc: "The size of the block cache, 0 disables the cache - might help on slow hardrives"
|
||||
defaultValue: 0
|
||||
defaultValueDesc: "0"
|
||||
name: "cache-size"
|
||||
abbr: "c" }: Natural
|
||||
abbr: "c" }: NBytes
|
||||
|
||||
persistence* {.
|
||||
desc: "Enables persistence mechanism, requires an Ethereum node"
|
||||
@ -288,7 +292,7 @@ proc defaultDataDir*(): string =
|
||||
|
||||
getHomeDir() / dataDir
|
||||
|
||||
func parseCmdArg*(T: type MultiAddress, input: string): T
|
||||
proc parseCmdArg*(T: type MultiAddress, input: string): T
|
||||
{.upraises: [ValueError, LPError].} =
|
||||
MultiAddress.init($input).tryGet()
|
||||
|
||||
@ -303,9 +307,25 @@ proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T =
|
||||
quit QuitFailure
|
||||
res
|
||||
|
||||
func parseCmdArg*(T: type EthAddress, address: string): T =
|
||||
proc parseCmdArg*(T: type EthAddress, address: string): T =
|
||||
EthAddress.init($address).get()
|
||||
|
||||
proc parseCmdArg*(T: type NBytes, val: string): T =
|
||||
var num = 0'i64
|
||||
let count = parseSize(val, num, alwaysBin = true)
|
||||
if count == 0:
|
||||
warn "Invalid number of bytes", nbytes=val
|
||||
quit QuitFailure
|
||||
NBytes(num)
|
||||
|
||||
proc parseCmdArg*(T: type Duration, val: string): T =
|
||||
var dur: Duration
|
||||
let count = parseDuration(val, dur)
|
||||
if count == 0:
|
||||
warn "Invalid duration parse", dur=dur
|
||||
quit QuitFailure
|
||||
dur
|
||||
|
||||
proc readValue*(r: var TomlReader, val: var EthAddress)
|
||||
{.upraises: [SerializationError, IOError].} =
|
||||
val = EthAddress.init(r.readValue(string)).get()
|
||||
@ -317,10 +337,36 @@ proc readValue*(r: var TomlReader, val: var SignedPeerRecord) =
|
||||
|
||||
val = SignedPeerRecord.parseCmdArg(uri)
|
||||
|
||||
proc readValue*(r: var TomlReader, val: var NBytes)
|
||||
{.upraises: [SerializationError, IOError].} =
|
||||
var value = 0'i64
|
||||
var str = r.readValue(string)
|
||||
let count = parseSize(str, value, alwaysBin = true)
|
||||
if count == 0:
|
||||
error "invalid number of bytes for configuration value", value = str
|
||||
quit QuitFailure
|
||||
val = NBytes(value)
|
||||
|
||||
proc readValue*(r: var TomlReader, val: var Duration)
|
||||
{.upraises: [SerializationError, IOError].} =
|
||||
var str = r.readValue(string)
|
||||
var dur: Duration
|
||||
let count = parseDuration(str, dur)
|
||||
if count == 0:
|
||||
error "Invalid duration parse", value = str
|
||||
quit QuitFailure
|
||||
val = dur
|
||||
|
||||
# no idea why confutils needs this:
|
||||
proc completeCmdArg*(T: type EthAddress; val: string): seq[string] =
|
||||
discard
|
||||
|
||||
proc completeCmdArg*(T: type NBytes; val: string): seq[string] =
|
||||
discard
|
||||
|
||||
proc completeCmdArg*(T: type Duration; val: string): seq[string] =
|
||||
discard
|
||||
|
||||
# silly chronicles, colors is a compile-time property
|
||||
proc stripAnsi(v: string): string =
|
||||
var
|
||||
|
@ -92,14 +92,14 @@ proc encode*(
|
||||
new_manifest = encoded.len
|
||||
|
||||
var
|
||||
encoder = self.encoderProvider(manifest.blockSize, blocks, parity)
|
||||
encoder = self.encoderProvider(manifest.blockSize.int, blocks, parity)
|
||||
|
||||
try:
|
||||
for i in 0..<encoded.steps:
|
||||
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
||||
var
|
||||
data = newSeq[seq[byte]](blocks) # number of blocks to encode
|
||||
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize))
|
||||
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize.int))
|
||||
# calculate block indexes to retrieve
|
||||
blockIdx = toSeq(countup(i, encoded.rounded - 1, encoded.steps))
|
||||
# request all blocks from the store
|
||||
@ -122,7 +122,7 @@ proc encode*(
|
||||
shallowCopy(data[j], blk.data)
|
||||
else:
|
||||
trace "Padding with empty block", pos = idx
|
||||
data[j] = newSeq[byte](manifest.blockSize)
|
||||
data[j] = newSeq[byte](manifest.blockSize.int)
|
||||
|
||||
trace "Erasure coding data", data = data.len, parity = parityData.len
|
||||
|
||||
@ -170,7 +170,7 @@ proc decode*(
|
||||
new_manifest = encoded.len
|
||||
|
||||
var
|
||||
decoder = self.decoderProvider(encoded.blockSize, encoded.ecK, encoded.ecM)
|
||||
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
|
||||
|
||||
try:
|
||||
for i in 0..<encoded.steps:
|
||||
@ -191,9 +191,9 @@ proc decode*(
|
||||
var
|
||||
data = newSeq[seq[byte]](encoded.ecK) # number of blocks to encode
|
||||
parityData = newSeq[seq[byte]](encoded.ecM)
|
||||
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize))
|
||||
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
|
||||
idxPendingBlocks = pendingBlocks # copy futures to make using with `one` easier
|
||||
emptyBlock = newSeq[byte](encoded.blockSize)
|
||||
emptyBlock = newSeq[byte](encoded.blockSize.int)
|
||||
resolved = 0
|
||||
|
||||
while true:
|
||||
|
@ -148,8 +148,8 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
|
||||
var
|
||||
self = Manifest(
|
||||
rootHash: rootHashCid.some,
|
||||
originalBytes: originalBytes.int,
|
||||
blockSize: blockSize.int,
|
||||
originalBytes: originalBytes.NBytes,
|
||||
blockSize: blockSize.NBytes,
|
||||
blocks: blocks,
|
||||
hcodec: (? rootHashCid.mhash.mapFailure).mcodec,
|
||||
codec: rootHashCid.mcodec,
|
||||
|
@ -21,9 +21,12 @@ import pkg/chronicles
|
||||
|
||||
import ../errors
|
||||
import ../utils
|
||||
import ../units
|
||||
import ../blocktype
|
||||
import ./types
|
||||
|
||||
export types
|
||||
|
||||
############################################################
|
||||
# Operations on block list
|
||||
############################################################
|
||||
@ -56,7 +59,7 @@ proc add*(self: Manifest, cid: Cid) =
|
||||
self.rootHash = Cid.none
|
||||
trace "Adding cid to manifest", cid
|
||||
self.blocks.add(cid)
|
||||
self.originalBytes = self.blocks.len * self.blockSize
|
||||
self.originalBytes = self.blocks.len.NBytes * self.blockSize
|
||||
|
||||
iterator items*(self: Manifest): Cid =
|
||||
for b in self.blocks:
|
||||
@ -74,10 +77,10 @@ func contains*(self: Manifest, cid: Cid): bool =
|
||||
# Various sizes and verification
|
||||
############################################################
|
||||
|
||||
func bytes*(self: Manifest, pad = true): int =
|
||||
func bytes*(self: Manifest, pad = true): NBytes =
|
||||
## Compute how many bytes corresponding StoreStream(Manifest, pad) will return
|
||||
if pad or self.protected:
|
||||
self.len * self.blockSize
|
||||
self.len.NBytes * self.blockSize
|
||||
else:
|
||||
self.originalBytes
|
||||
|
||||
@ -165,7 +168,7 @@ proc new*(
|
||||
version = CIDv1,
|
||||
hcodec = multiCodec("sha2-256"),
|
||||
codec = multiCodec("raw"),
|
||||
blockSize = BlockSize
|
||||
blockSize = DefaultBlockSize
|
||||
): ?!Manifest =
|
||||
## Create a manifest using an array of `Cid`s
|
||||
##
|
||||
@ -179,7 +182,7 @@ proc new*(
|
||||
codec: codec,
|
||||
hcodec: hcodec,
|
||||
blockSize: blockSize,
|
||||
originalBytes: blocks.len * blockSize,
|
||||
originalBytes: blocks.len.NBytes * blockSize,
|
||||
protected: protected).success
|
||||
|
||||
proc new*(
|
||||
|
@ -13,6 +13,9 @@ import std/tables
|
||||
import pkg/libp2p
|
||||
import pkg/questionable
|
||||
|
||||
import ../units
|
||||
export units
|
||||
|
||||
const
|
||||
BlockCodec* = multiCodec("raw")
|
||||
DagPBCodec* = multiCodec("dag-pb")
|
||||
@ -29,8 +32,8 @@ const
|
||||
type
|
||||
Manifest* = ref object of RootObj
|
||||
rootHash*: ?Cid # Root (tree) hash of the contained data set
|
||||
originalBytes*: int # Exact size of the original (uploaded) file
|
||||
blockSize*: int # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||
originalBytes*: NBytes # Exact size of the original (uploaded) file
|
||||
blockSize*: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
|
||||
blocks*: seq[Cid] # Block Cid
|
||||
version*: CidVersion # Cid version
|
||||
hcodec*: MultiCodec # Multihash codec
|
||||
|
@ -184,7 +184,7 @@ proc retrieve*(
|
||||
proc store*(
|
||||
self: CodexNodeRef,
|
||||
stream: LPStream,
|
||||
blockSize = BlockSize
|
||||
blockSize = DefaultBlockSize
|
||||
): Future[?!Cid] {.async.} =
|
||||
## Save stream contents as dataset with given blockSize
|
||||
## to nodes's BlockStore, and return Cid of its manifest
|
||||
@ -219,7 +219,7 @@ proc store*(
|
||||
await stream.close()
|
||||
|
||||
# Generate manifest
|
||||
blockManifest.originalBytes = chunker.offset # store the exact file size
|
||||
blockManifest.originalBytes = NBytes chunker.offset # store the exact file size
|
||||
without data =? blockManifest.encode():
|
||||
return failure(
|
||||
newException(CodexError, "Could not generate dataset manifest!"))
|
||||
@ -296,7 +296,7 @@ proc requestStorage*(
|
||||
let request = StorageRequest(
|
||||
ask: StorageAsk(
|
||||
slots: nodes + tolerance,
|
||||
slotSize: (encoded.blockSize * encoded.steps).u256,
|
||||
slotSize: (encoded.blockSize.int * encoded.steps).u256,
|
||||
duration: duration,
|
||||
proofProbability: proofProbability,
|
||||
reward: reward,
|
||||
|
@ -153,7 +153,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
||||
|
||||
while not stream.atEof:
|
||||
var
|
||||
buff = newSeqUninitialized[byte](BlockSize)
|
||||
buff = newSeqUninitialized[byte](DefaultBlockSize.int)
|
||||
len = await stream.readOnce(addr buff[0], buff.len)
|
||||
|
||||
buff.setLen(len)
|
||||
|
@ -21,6 +21,7 @@ import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./blockstore
|
||||
import ../units
|
||||
import ../chunker
|
||||
import ../errors
|
||||
import ../manifest
|
||||
@ -32,16 +33,14 @@ logScope:
|
||||
|
||||
type
|
||||
CacheStore* = ref object of BlockStore
|
||||
currentSize*: Natural # in bytes
|
||||
size*: Positive # in bytes
|
||||
currentSize*: NBytes
|
||||
size*: NBytes
|
||||
cache: LruCache[Cid, Block]
|
||||
|
||||
InvalidBlockSize* = object of CodexError
|
||||
|
||||
const
|
||||
MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes
|
||||
DefaultCacheSizeMiB* = 5
|
||||
DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes
|
||||
DefaultCacheSize*: NBytes = 5.MiBs
|
||||
|
||||
method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
## Get a block from the stores
|
||||
@ -133,7 +132,7 @@ method listBlocks*(
|
||||
|
||||
func putBlockSync(self: CacheStore, blk: Block): bool =
|
||||
|
||||
let blkSize = blk.data.len # in bytes
|
||||
let blkSize = blk.data.len.NBytes # in bytes
|
||||
|
||||
if blkSize > self.size:
|
||||
trace "Block size is larger than cache size", blk = blkSize, cache = self.size
|
||||
@ -142,7 +141,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool =
|
||||
while self.currentSize + blkSize > self.size:
|
||||
try:
|
||||
let removed = self.cache.removeLru()
|
||||
self.currentSize -= removed.data.len
|
||||
self.currentSize -= removed.data.len.NBytes
|
||||
except EmptyLruCacheError as exc:
|
||||
# if the cache is empty, can't remove anything, so break and add item
|
||||
# to the cache
|
||||
@ -179,7 +178,7 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
||||
|
||||
let removed = self.cache.del(cid)
|
||||
if removed.isSome:
|
||||
self.currentSize -= removed.get.data.len
|
||||
self.currentSize -= removed.get.data.len.NBytes
|
||||
|
||||
return success()
|
||||
|
||||
@ -189,11 +188,11 @@ method close*(self: CacheStore): Future[void] {.async.} =
|
||||
|
||||
discard
|
||||
|
||||
func new*(
|
||||
proc new*(
|
||||
_: type CacheStore,
|
||||
blocks: openArray[Block] = [],
|
||||
cacheSize: Positive = DefaultCacheSize, # in bytes
|
||||
chunkSize: Positive = DefaultChunkSize # in bytes
|
||||
cacheSize: NBytes = DefaultCacheSize,
|
||||
chunkSize: NBytes = DefaultChunkSize
|
||||
): CacheStore {.raises: [Defect, ValueError].} =
|
||||
## Create a new CacheStore instance
|
||||
##
|
||||
@ -203,9 +202,9 @@ func new*(
|
||||
if cacheSize < chunkSize:
|
||||
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
|
||||
|
||||
var currentSize = 0
|
||||
let
|
||||
size = cacheSize div chunkSize
|
||||
currentSize = 0'nb
|
||||
size = int(cacheSize div chunkSize)
|
||||
cache = newLruCache[Cid, Block](size)
|
||||
store = CacheStore(
|
||||
cache: cache,
|
||||
@ -216,3 +215,11 @@ func new*(
|
||||
discard store.putBlockSync(blk)
|
||||
|
||||
return store
|
||||
|
||||
proc new*(
|
||||
_: type CacheStore,
|
||||
blocks: openArray[Block] = [],
|
||||
cacheSize: int,
|
||||
chunkSize: int
|
||||
): CacheStore {.raises: [Defect, ValueError].} =
|
||||
CacheStore.new(blocks, NBytes cacheSize, NBytes chunkSize)
|
||||
|
@ -63,7 +63,7 @@ proc new*(
|
||||
result.initStream()
|
||||
|
||||
method `size`*(self: StoreStream): int =
|
||||
bytes(self.manifest, self.pad)
|
||||
bytes(self.manifest, self.pad).int
|
||||
|
||||
proc `size=`*(self: StoreStream, size: int)
|
||||
{.error: "Setting the size is forbidden".} =
|
||||
@ -93,9 +93,11 @@ method readOnce*(
|
||||
# Compute from the current stream position `self.offset` the block num/offset to read
|
||||
# Compute how many bytes to read from this block
|
||||
let
|
||||
blockNum = self.offset div self.manifest.blockSize
|
||||
blockOffset = self.offset mod self.manifest.blockSize
|
||||
readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize - blockOffset])
|
||||
blockNum = self.offset div self.manifest.blockSize.int
|
||||
blockOffset = self.offset mod self.manifest.blockSize.int
|
||||
readBytes = min([self.size - self.offset,
|
||||
nbytes - read,
|
||||
self.manifest.blockSize.int - blockOffset])
|
||||
|
||||
# Read contents of block `blockNum`
|
||||
without blk =? await self.store.getBlock(self.manifest[blockNum]), error:
|
||||
|
83
codex/units.nim
Normal file
83
codex/units.nim
Normal file
@ -0,0 +1,83 @@
|
||||
## Nim-Codex
|
||||
## Copyright (c) 2023 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
##
|
||||
|
||||
import std/hashes
|
||||
import std/strutils
|
||||
|
||||
import pkg/upraises
|
||||
import pkg/json_serialization
|
||||
import pkg/json_serialization/std/options
|
||||
|
||||
type
|
||||
NBytes* = distinct Natural
|
||||
|
||||
template basicMaths(T: untyped) =
|
||||
proc `+` *(x: T, y: static[int]): T = T(`+`(x.Natural, y.Natural))
|
||||
proc `-` *(x: T, y: static[int]): T = T(`-`(x.Natural, y.Natural))
|
||||
proc `*` *(x: T, y: static[int]): T = T(`*`(x.Natural, y.Natural))
|
||||
proc `+` *(x, y: T): T = T(`+`(x.Natural, y.Natural))
|
||||
proc `-` *(x, y: T): T = T(`-`(x.Natural, y.Natural))
|
||||
proc `*` *(x, y: T): T = T(`*`(x.Natural, y.Natural))
|
||||
proc `<` *(x, y: T): bool {.borrow.}
|
||||
proc `<=` *(x, y: T): bool {.borrow.}
|
||||
proc `==` *(x, y: T): bool {.borrow.}
|
||||
proc `+=` *(x: var T, y: T) {.borrow.}
|
||||
proc `-=` *(x: var T, y: T) {.borrow.}
|
||||
proc `hash` *(x: T): Hash {.borrow.}
|
||||
template divMaths(T: untyped) =
|
||||
proc `mod` *(x, y: T): T = T(`mod`(x.Natural, y.Natural))
|
||||
proc `div` *(x, y: T): Natural = `div`(x.Natural, y.Natural)
|
||||
# proc `/` *(x, y: T): Natural = `/`(x.Natural, y.Natural)
|
||||
|
||||
basicMaths(NBytes)
|
||||
divMaths(NBytes)
|
||||
|
||||
proc `$`*(ts: NBytes): string = $(int(ts)) & "'NByte"
|
||||
proc `'nb`*(n: string): NBytes = parseInt(n).NBytes
|
||||
|
||||
|
||||
const
|
||||
MiB = 1024.NBytes * 1024.NBytes # ByteSz, 1 mebibyte = 1,048,576 ByteSz
|
||||
|
||||
proc MiBs*(v: Natural): NBytes = v.NBytes * MiB
|
||||
|
||||
func divUp*[T: NBytes](a, b : T): int =
|
||||
## Division with result rounded up (rather than truncated as in 'div')
|
||||
assert(b != T(0))
|
||||
if a==T(0): int(0) else: int( ((a - T(1)) div b) + 1 )
|
||||
|
||||
proc writeValue*(
|
||||
writer: var JsonWriter,
|
||||
value: NBytes
|
||||
) {.upraises:[IOError].} =
|
||||
writer.writeValue value.int
|
||||
|
||||
proc readValue*(
|
||||
reader: var JsonReader,
|
||||
value: var NBytes
|
||||
) {.upraises: [SerializationError, IOError].} =
|
||||
value = NBytes reader.readValue(int)
|
||||
|
||||
when isMainModule:
|
||||
|
||||
import unittest2
|
||||
|
||||
suite "maths":
|
||||
test "basics":
|
||||
let x = 5.NBytes
|
||||
let y = 10.NBytes
|
||||
check x + y == 15.NBytes
|
||||
expect RangeDefect:
|
||||
check x - y == 10.NBytes
|
||||
check y - x == 5.NBytes
|
||||
check x * y == 50.NBytes
|
||||
check y div x == 2
|
||||
check y > x == true
|
||||
check y <= x == false
|
@ -1,15 +1,84 @@
|
||||
## Nim-Codex
|
||||
## Copyright (c) 2023 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
##
|
||||
|
||||
import std/parseutils
|
||||
|
||||
import pkg/chronos
|
||||
|
||||
import ./utils/asyncheapqueue
|
||||
import ./utils/fileutils
|
||||
|
||||
export asyncheapqueue, fileutils
|
||||
|
||||
|
||||
func divUp*[T](a, b : T): T =
|
||||
func divUp*[T: SomeInteger](a, b : T): T =
|
||||
## Division with result rounded up (rather than truncated as in 'div')
|
||||
assert(b != 0)
|
||||
if a==0: 0 else: ((a - 1) div b) + 1
|
||||
assert(b != T(0))
|
||||
if a==T(0): T(0) else: ((a - T(1)) div b) + T(1)
|
||||
|
||||
func roundUp*[T](a, b : T): T =
|
||||
## Round up 'a' to the next value divisible by 'b'
|
||||
divUp(a,b) * b
|
||||
|
||||
when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine
|
||||
const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'}
|
||||
|
||||
func toLowerAscii(c: char): char =
|
||||
if c in {'A'..'Z'}: char(uint8(c) xor 0b0010_0000'u8) else: c
|
||||
|
||||
func parseDuration*(s: string, size: var Duration): int =
|
||||
## Parse a size qualified by simple time into `Duration`.
|
||||
##
|
||||
runnableExamples:
|
||||
var res: Duration # caller must still know if 'b' refers to bytes|bits
|
||||
doAssert parseDuration("10H", res) == 3
|
||||
doAssert res == initDuration(hours=10)
|
||||
doAssert parseDuration("64m", res) == 6
|
||||
doAssert res == initDuration(minutes=64)
|
||||
doAssert parseDuration("7m/block", res) == 2 # '/' stops parse
|
||||
doAssert res == initDuration(minutes=7) # 1 shl 30, forced binary metric
|
||||
doAssert parseDuration("3d", res) == 2 # '/' stops parse
|
||||
doAssert res == initDuration(days=3) # 1 shl 30, forced binary metric
|
||||
|
||||
const prefix = "s" & "mhdw" # byte|bit & lowCase metric-ish prefixes
|
||||
const timeScale = [1.0, 60.0, 3600.0, 86_400.0, 604_800.0]
|
||||
|
||||
var number: float
|
||||
var scale = 1.0
|
||||
result = parseFloat(s, number)
|
||||
if number < 0: # While parseFloat accepts negatives ..
|
||||
result = 0 #.. we do not since sizes cannot be < 0
|
||||
else:
|
||||
let start = result # Save spot to maybe unwind white to EOS
|
||||
while result < s.len and s[result] in Whitespace:
|
||||
inc result
|
||||
if result < s.len: # Illegal starting char => unity
|
||||
if (let si = prefix.find(s[result].toLowerAscii); si >= 0):
|
||||
inc result # Now parse the scale
|
||||
scale = timeScale[si]
|
||||
else: # Unwind result advancement when there..
|
||||
result = start #..is no unit to the end of `s`.
|
||||
var sizeF = number * scale + 0.5 # Saturate to int64.high when too big
|
||||
size = seconds(int(sizeF))
|
||||
|
||||
when isMainModule:
|
||||
import unittest2
|
||||
|
||||
suite "time parse":
|
||||
test "parseDuration":
|
||||
var res: Duration # caller must still know if 'b' refers to bytes|bits
|
||||
check parseDuration("10Hr", res) == 3
|
||||
check res == hours(10)
|
||||
check parseDuration("64min", res) == 3
|
||||
check res == minutes(64)
|
||||
check parseDuration("7m/block", res) == 2 # '/' stops parse
|
||||
check res == minutes(7) # 1 shl 30, forced binary metric
|
||||
check parseDuration("3d", res) == 2 # '/' stops parse
|
||||
check res == days(3) # 1 shl 30, forced binary metric
|
||||
|
@ -20,8 +20,8 @@ import ../../helpers
|
||||
|
||||
asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
let
|
||||
chunker1 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256)
|
||||
chunker2 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256)
|
||||
chunker1 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb)
|
||||
chunker2 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb)
|
||||
|
||||
var
|
||||
nodeCmps1, nodeCmps2: NodesComponents
|
||||
@ -182,7 +182,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
|
||||
asyncchecksuite "NetworkStore - multiple nodes":
|
||||
let
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256'nb)
|
||||
|
||||
var
|
||||
switch: seq[Switch]
|
||||
|
@ -37,7 +37,7 @@ asyncchecksuite "NetworkStore engine basic":
|
||||
rng = Rng.instance()
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb)
|
||||
wallet = WalletRef.example
|
||||
blockDiscovery = Discovery.new()
|
||||
peerStore = PeerCtxStore.new()
|
||||
@ -144,7 +144,7 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
|
||||
setup:
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256)
|
||||
chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb)
|
||||
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
@ -373,7 +373,7 @@ asyncchecksuite "Task Handler":
|
||||
|
||||
setup:
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256)
|
||||
chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256'nb)
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
if chunk.len <= 0:
|
||||
|
@ -12,7 +12,7 @@ import ./helpers/mockdiscovery
|
||||
import ./helpers/eventually
|
||||
import ../checktest
|
||||
|
||||
export randomchunker, nodeutils, mockdiscovery, eventually, checktest
|
||||
export randomchunker, nodeutils, mockdiscovery, eventually, checktest, manifest
|
||||
|
||||
# NOTE: The meaning of equality for blocks
|
||||
# is changed here, because blocks are now `ref`
|
||||
|
@ -13,13 +13,17 @@ type
|
||||
proc new*(
|
||||
T: type RandomChunker,
|
||||
rng: Rng,
|
||||
chunkSize = DefaultChunkSize,
|
||||
size: int,
|
||||
chunkSize: int | NBytes,
|
||||
size: int | NBytes,
|
||||
pad = false
|
||||
): RandomChunker =
|
||||
## Create a chunker that produces random data
|
||||
##
|
||||
|
||||
let
|
||||
size = size.int
|
||||
chunkSize = chunkSize.NBytes
|
||||
|
||||
var consumed = 0
|
||||
proc reader(data: ChunkBuffer, len: int): Future[int]
|
||||
{.async, gcsafe, raises: [Defect].} =
|
||||
|
@ -20,7 +20,7 @@ import ../examples
|
||||
import ../helpers
|
||||
|
||||
const
|
||||
BlockSize = 31 * 64
|
||||
BlockSize = 31'nb * 64
|
||||
DataSetSize = BlockSize * 100
|
||||
|
||||
asyncchecksuite "Storage Proofs Network":
|
||||
@ -48,7 +48,7 @@ asyncchecksuite "Storage Proofs Network":
|
||||
tags: seq[Tag]
|
||||
|
||||
setup:
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
|
||||
store = CacheStore.new(cacheSize = DataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
(spk, ssk) = st.keyGen()
|
||||
@ -66,7 +66,7 @@ asyncchecksuite "Storage Proofs Network":
|
||||
por = await PoR.init(
|
||||
porStream,
|
||||
ssk, spk,
|
||||
BlockSize)
|
||||
BlockSize.int)
|
||||
|
||||
porMsg = por.toMessage()
|
||||
tags = blocks.mapIt(
|
||||
|
@ -14,8 +14,8 @@ import pkg/codex/blocktype as bt
|
||||
import ../helpers
|
||||
|
||||
const
|
||||
BlockSize = 31 * 4
|
||||
SectorSize = 31
|
||||
BlockSize = 31'nb * 4
|
||||
SectorSize = 31'nb
|
||||
SectorsPerBlock = BlockSize div SectorSize
|
||||
DataSetSize = BlockSize * 100
|
||||
|
||||
@ -30,7 +30,7 @@ asyncchecksuite "BLS PoR":
|
||||
proofStream: StoreStream
|
||||
|
||||
setup:
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
|
||||
store = CacheStore.new(cacheSize = DataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
(spk, ssk) = st.keyGen()
|
||||
@ -55,7 +55,7 @@ asyncchecksuite "BLS PoR":
|
||||
porStream,
|
||||
ssk,
|
||||
spk,
|
||||
BlockSize)
|
||||
BlockSize.int)
|
||||
|
||||
proc createProof(por: PoR, q: seq[QElement]): Future[Proof] =
|
||||
return generateProof(
|
||||
@ -96,7 +96,7 @@ asyncchecksuite "Test Serialization":
|
||||
proofStream: StoreStream
|
||||
|
||||
setup:
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
|
||||
store = CacheStore.new(cacheSize = DataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
|
||||
@ -114,7 +114,7 @@ asyncchecksuite "Test Serialization":
|
||||
porStream,
|
||||
ssk,
|
||||
spk,
|
||||
BlockSize)
|
||||
BlockSize.int)
|
||||
q = generateQuery(por.tau, 22)
|
||||
proofStream = StoreStream.new(store, manifest)
|
||||
proof = await generateProof(
|
||||
|
@ -12,7 +12,7 @@ import pkg/codex/blocktype as bt
|
||||
import ../helpers
|
||||
|
||||
const
|
||||
BlockSize = 31 * 64
|
||||
BlockSize = 31'nb * 64'nb
|
||||
DataSetSize = BlockSize * 100
|
||||
|
||||
asyncchecksuite "Test PoR store":
|
||||
@ -34,7 +34,7 @@ asyncchecksuite "Test PoR store":
|
||||
tags: seq[Tag]
|
||||
|
||||
setup:
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
|
||||
store = CacheStore.new(cacheSize = DataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
(spk, ssk) = st.keyGen()
|
||||
@ -52,7 +52,7 @@ asyncchecksuite "Test PoR store":
|
||||
por = await PoR.init(
|
||||
porStream,
|
||||
ssk, spk,
|
||||
BlockSize)
|
||||
BlockSize.int)
|
||||
|
||||
porMsg = por.toMessage()
|
||||
tags = blocks.mapIt(
|
||||
|
@ -30,10 +30,10 @@ checksuite "Cache Store":
|
||||
discard CacheStore.new(cacheSize = 1, chunkSize = 2)
|
||||
|
||||
store = CacheStore.new(cacheSize = 100, chunkSize = 1)
|
||||
check store.currentSize == 0
|
||||
check store.currentSize == 0'nb
|
||||
|
||||
store = CacheStore.new(@[newBlock1, newBlock2, newBlock3])
|
||||
check store.currentSize == 300
|
||||
check store.currentSize == 300'nb
|
||||
|
||||
# initial cache blocks total more than cache size, currentSize should
|
||||
# never exceed max cache size
|
||||
@ -41,7 +41,7 @@ checksuite "Cache Store":
|
||||
blocks = @[newBlock1, newBlock2, newBlock3],
|
||||
cacheSize = 200,
|
||||
chunkSize = 1)
|
||||
check store.currentSize == 200
|
||||
check store.currentSize == 200'nb
|
||||
|
||||
# cache size cannot be less than chunks size
|
||||
expect ValueError:
|
||||
@ -67,7 +67,7 @@ checksuite "Cache Store":
|
||||
not (await store.hasBlock(newBlock1.cid)).tryGet()
|
||||
(await store.hasBlock(newBlock2.cid)).tryGet()
|
||||
(await store.hasBlock(newBlock2.cid)).tryGet()
|
||||
store.currentSize == newBlock2.data.len + newBlock3.data.len # 200
|
||||
store.currentSize.int == newBlock2.data.len + newBlock3.data.len # 200
|
||||
|
||||
commonBlockStoreTests(
|
||||
"Cache", proc: BlockStore =
|
||||
|
@ -24,7 +24,7 @@ asyncchecksuite "Chunking":
|
||||
|
||||
let chunker = Chunker.new(
|
||||
reader = reader,
|
||||
chunkSize = 2)
|
||||
chunkSize = 2'nb)
|
||||
|
||||
check:
|
||||
(await chunker.getBytes()) == [1.byte, 2]
|
||||
@ -39,7 +39,7 @@ asyncchecksuite "Chunking":
|
||||
let stream = BufferStream.new()
|
||||
let chunker = LPStreamChunker.new(
|
||||
stream = stream,
|
||||
chunkSize = 2)
|
||||
chunkSize = 2'nb)
|
||||
|
||||
proc writer() {.async.} =
|
||||
for d in [@[1.byte, 2, 3, 4], @[5.byte, 6, 7, 8], @[9.byte, 0]]:
|
||||
@ -63,7 +63,7 @@ asyncchecksuite "Chunking":
|
||||
let
|
||||
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
|
||||
file = open(path)
|
||||
fileChunker = FileChunker.new(file = file, chunkSize = 256, pad = false)
|
||||
fileChunker = FileChunker.new(file = file, chunkSize = 256'nb, pad = false)
|
||||
|
||||
var data: seq[byte]
|
||||
while true:
|
||||
@ -71,7 +71,7 @@ asyncchecksuite "Chunking":
|
||||
if buff.len <= 0:
|
||||
break
|
||||
|
||||
check buff.len <= fileChunker.chunkSize
|
||||
check buff.len <= fileChunker.chunkSize.int
|
||||
data.add(buff)
|
||||
|
||||
check:
|
||||
|
@ -14,7 +14,7 @@ import pkg/codex/rng
|
||||
import ./helpers
|
||||
|
||||
asyncchecksuite "Erasure encode/decode":
|
||||
const BlockSize = 1024
|
||||
const BlockSize = 1024'nb
|
||||
const dataSetSize = BlockSize * 123 # weird geometry
|
||||
|
||||
var rng: Rng
|
||||
|
@ -57,7 +57,7 @@ asyncchecksuite "Test Node":
|
||||
proc retrieve(cid: Cid): Future[seq[byte]] {.async.} =
|
||||
# Retrieve an entire file contents by file Cid
|
||||
let
|
||||
oddChunkSize = math.trunc(BlockSize/1.359).int # Let's check that node.retrieve can correctly rechunk data
|
||||
oddChunkSize = math.trunc(DefaultBlockSize.float/1.359).int # Let's check that node.retrieve can correctly rechunk data
|
||||
stream = (await node.retrieve(cid)).tryGet()
|
||||
var
|
||||
data: seq[byte]
|
||||
@ -76,7 +76,7 @@ asyncchecksuite "Test Node":
|
||||
|
||||
setup:
|
||||
file = open(path.splitFile().dir /../ "fixtures" / "test.jpg")
|
||||
chunker = FileChunker.new(file = file, chunkSize = BlockSize)
|
||||
chunker = FileChunker.new(file = file, chunkSize = DefaultBlockSize)
|
||||
switch = newStandardSwitch()
|
||||
wallet = WalletRef.new(EthPrivateKey.random())
|
||||
network = BlockExcNetwork.new(switch)
|
||||
@ -132,7 +132,7 @@ asyncchecksuite "Test Node":
|
||||
let
|
||||
stream = BufferStream.new()
|
||||
storeFut = node.store(stream)
|
||||
oddChunkSize = math.trunc(BlockSize/3.14).int # Let's check that node.store can correctly rechunk these odd chunks
|
||||
oddChunkSize = math.trunc(DefaultBlockSize.float/3.14).NBytes # Let's check that node.store can correctly rechunk these odd chunks
|
||||
oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue
|
||||
var
|
||||
original: seq[byte]
|
||||
@ -159,7 +159,7 @@ asyncchecksuite "Test Node":
|
||||
let data = await retrieve(manifestCid)
|
||||
|
||||
check:
|
||||
data.len == localManifest.originalBytes
|
||||
data.len == localManifest.originalBytes.int
|
||||
data.len == original.len
|
||||
sha256.digest(data) == sha256.digest(original)
|
||||
|
||||
|
@ -42,7 +42,7 @@ asyncchecksuite "StoreStream":
|
||||
|
||||
setup:
|
||||
store = CacheStore.new()
|
||||
manifest = Manifest.new(blockSize = 10).tryGet()
|
||||
manifest = Manifest.new(blockSize = 10'nb).tryGet()
|
||||
stream = StoreStream.new(store, manifest)
|
||||
|
||||
for d in data:
|
||||
|
Loading…
x
Reference in New Issue
Block a user