mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-02-16 14:57:54 +00:00
Revert "[build] track nim-libp2p's unstable branch"
This reverts commit 8899b68d1a4e438e7125c37d001cb9673b243e19.
This commit is contained in:
parent
61b18bbe93
commit
499890d44c
1
.gitignore
vendored
1
.gitignore
vendored
@ -33,4 +33,3 @@ nimble.paths
|
|||||||
|
|
||||||
.update.timestamp
|
.update.timestamp
|
||||||
codex.nims
|
codex.nims
|
||||||
nimbus-build-system.paths
|
|
||||||
|
@ -91,7 +91,7 @@ when isMainModule:
|
|||||||
waitFor server.stop()
|
waitFor server.stop()
|
||||||
notice "Stopped Codex"
|
notice "Stopped Codex"
|
||||||
|
|
||||||
c_signal(ansi_c.SIGTERM, SIGTERMHandler)
|
c_signal(SIGTERM, SIGTERMHandler)
|
||||||
|
|
||||||
waitFor server.start()
|
waitFor server.start()
|
||||||
notice "Exited codex"
|
notice "Exited codex"
|
||||||
|
@ -302,7 +302,7 @@ proc blocksHandler*(
|
|||||||
trace "Got blocks from peer", peer, len = blocks.len
|
trace "Got blocks from peer", peer, len = blocks.len
|
||||||
for blk in blocks:
|
for blk in blocks:
|
||||||
if isErr (await b.localStore.putBlock(blk)):
|
if isErr (await b.localStore.putBlock(blk)):
|
||||||
trace "Unable to store block", cid = $blk.cid
|
trace "Unable to store block", cid = blk.cid
|
||||||
|
|
||||||
await b.resolveBlocks(blocks)
|
await b.resolveBlocks(blocks)
|
||||||
let
|
let
|
||||||
|
@ -52,7 +52,7 @@ proc getWantHandle*(
|
|||||||
|
|
||||||
return await p.blocks[cid].handle.wait(timeout)
|
return await p.blocks[cid].handle.wait(timeout)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
trace "Blocks cancelled", exc = exc.msg, cid = $cid
|
trace "Blocks cancelled", exc = exc.msg, cid
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Pending WANT failed or expired", exc = exc.msg
|
trace "Pending WANT failed or expired", exc = exc.msg
|
||||||
|
@ -7,9 +7,6 @@
|
|||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import pkg/upraises
|
|
||||||
push: {.upraises: [].}
|
|
||||||
|
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
import pkg/libp2p
|
import pkg/libp2p
|
||||||
@ -24,8 +21,6 @@ const
|
|||||||
MaxMessageSize = 100 * 1 shl 20 # manifest files can be big
|
MaxMessageSize = 100 * 1 shl 20 # manifest files can be big
|
||||||
|
|
||||||
type
|
type
|
||||||
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
|
|
||||||
|
|
||||||
RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
NetworkPeer* = ref object of RootObj
|
NetworkPeer* = ref object of RootObj
|
||||||
|
@ -76,7 +76,7 @@ proc encode*(
|
|||||||
##
|
##
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
original_cid = $manifest.cid.get()
|
original_cid = manifest.cid.get()
|
||||||
original_len = manifest.len
|
original_len = manifest.len
|
||||||
blocks = blocks
|
blocks = blocks
|
||||||
parity = parity
|
parity = parity
|
||||||
@ -118,7 +118,7 @@ proc encode*(
|
|||||||
trace "Unable to retrieve block", error = error.msg
|
trace "Unable to retrieve block", error = error.msg
|
||||||
return failure error
|
return failure error
|
||||||
|
|
||||||
trace "Encoding block", cid = $blk.cid, pos = idx
|
trace "Encoding block", cid = blk.cid, pos = idx
|
||||||
shallowCopy(data[j], blk.data)
|
shallowCopy(data[j], blk.data)
|
||||||
else:
|
else:
|
||||||
trace "Padding with empty block", pos = idx
|
trace "Padding with empty block", pos = idx
|
||||||
@ -137,10 +137,10 @@ proc encode*(
|
|||||||
trace "Unable to create parity block", err = error.msg
|
trace "Unable to create parity block", err = error.msg
|
||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
trace "Adding parity block", cid = $blk.cid, pos = idx
|
trace "Adding parity block", cid = blk.cid, pos = idx
|
||||||
encoded[idx] = blk.cid
|
encoded[idx] = blk.cid
|
||||||
if isErr (await self.store.putBlock(blk)):
|
if isErr (await self.store.putBlock(blk)):
|
||||||
trace "Unable to store block!", cid = $blk.cid
|
trace "Unable to store block!", cid = blk.cid
|
||||||
return failure("Unable to store block!")
|
return failure("Unable to store block!")
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
trace "Erasure coding encoding cancelled"
|
trace "Erasure coding encoding cancelled"
|
||||||
@ -212,10 +212,10 @@ proc decode*(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if idx >= encoded.K:
|
if idx >= encoded.K:
|
||||||
trace "Retrieved parity block", cid = $blk.cid, idx
|
trace "Retrieved parity block", cid = blk.cid, idx
|
||||||
shallowCopy(parityData[idx - encoded.K], if blk.isEmpty: emptyBlock else: blk.data)
|
shallowCopy(parityData[idx - encoded.K], if blk.isEmpty: emptyBlock else: blk.data)
|
||||||
else:
|
else:
|
||||||
trace "Retrieved data block", cid = $blk.cid, idx
|
trace "Retrieved data block", cid = blk.cid, idx
|
||||||
shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data)
|
shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data)
|
||||||
|
|
||||||
resolved.inc
|
resolved.inc
|
||||||
@ -241,9 +241,9 @@ proc decode*(
|
|||||||
trace "Unable to create block!", exc = error.msg
|
trace "Unable to create block!", exc = error.msg
|
||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
trace "Recovered block", cid = $blk.cid
|
trace "Recovered block", cid = blk.cid
|
||||||
if isErr (await self.store.putBlock(blk)):
|
if isErr (await self.store.putBlock(blk)):
|
||||||
trace "Unable to store block!", cid = $blk.cid
|
trace "Unable to store block!", cid = blk.cid
|
||||||
return failure("Unable to store block!")
|
return failure("Unable to store block!")
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
trace "Erasure coding decoding cancelled"
|
trace "Erasure coding decoding cancelled"
|
||||||
|
@ -49,7 +49,7 @@ func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) =
|
|||||||
proc add*(self: Manifest, cid: Cid) =
|
proc add*(self: Manifest, cid: Cid) =
|
||||||
assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks
|
assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks
|
||||||
self.rootHash = Cid.none
|
self.rootHash = Cid.none
|
||||||
trace "Adding cid to manifest", cid = $cid
|
trace "Adding cid to manifest", cid
|
||||||
self.blocks.add(cid)
|
self.blocks.add(cid)
|
||||||
self.originalBytes = self.blocks.len * self.blockSize
|
self.originalBytes = self.blocks.len * self.blockSize
|
||||||
|
|
||||||
|
@ -73,7 +73,7 @@ proc fetchManifest*(
|
|||||||
containerType =? ManifestContainers.?[$contentType]:
|
containerType =? ManifestContainers.?[$contentType]:
|
||||||
return failure "CID has invalid content type for manifest"
|
return failure "CID has invalid content type for manifest"
|
||||||
|
|
||||||
trace "Received retrieval request", cid = $cid
|
trace "Received retrieval request", cid
|
||||||
|
|
||||||
without blk =? await node.blockStore.getBlock(cid), error:
|
without blk =? await node.blockStore.getBlock(cid), error:
|
||||||
return failure error
|
return failure error
|
||||||
@ -126,9 +126,9 @@ proc retrieve*(
|
|||||||
try:
|
try:
|
||||||
# Spawn an erasure decoding job
|
# Spawn an erasure decoding job
|
||||||
without res =? (await node.erasure.decode(manifest)), error:
|
without res =? (await node.erasure.decode(manifest)), error:
|
||||||
trace "Unable to erasure decode manifest", cid = $cid, exc = error.msg
|
trace "Unable to erasure decode manifest", cid, exc = error.msg
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception decoding manifest", cid = $cid
|
trace "Exception decoding manifest", cid
|
||||||
#
|
#
|
||||||
asyncSpawn erasureJob()
|
asyncSpawn erasureJob()
|
||||||
else:
|
else:
|
||||||
@ -219,8 +219,8 @@ proc store*(
|
|||||||
trace "Unable to generate manifest Cid!", exc = error.msg
|
trace "Unable to generate manifest Cid!", exc = error.msg
|
||||||
return failure(error.msg)
|
return failure(error.msg)
|
||||||
|
|
||||||
trace "Stored data", manifestCid = $manifest.cid,
|
trace "Stored data", manifestCid = manifest.cid,
|
||||||
contentCid = $cid,
|
contentCid = cid,
|
||||||
blocks = blockManifest.len
|
blocks = blockManifest.len
|
||||||
|
|
||||||
# Announce manifest
|
# Announce manifest
|
||||||
@ -244,19 +244,19 @@ proc requestStorage*(self: CodexNodeRef,
|
|||||||
## - Run the PoR setup on the erasure dataset
|
## - Run the PoR setup on the erasure dataset
|
||||||
## - Call into the marketplace and purchasing contracts
|
## - Call into the marketplace and purchasing contracts
|
||||||
##
|
##
|
||||||
trace "Received a request for storage!", cid = $cid, duration, nodes, tolerance, reward
|
trace "Received a request for storage!", cid, duration, nodes, tolerance, reward
|
||||||
|
|
||||||
without contracts =? self.contracts:
|
without contracts =? self.contracts:
|
||||||
trace "Purchasing not available"
|
trace "Purchasing not available"
|
||||||
return failure "Purchasing not available"
|
return failure "Purchasing not available"
|
||||||
|
|
||||||
without manifest =? await self.fetchManifest(cid), error:
|
without manifest =? await self.fetchManifest(cid), error:
|
||||||
trace "Unable to fetch manifest for cid", cid = $cid
|
trace "Unable to fetch manifest for cid", cid
|
||||||
raise error
|
raise error
|
||||||
|
|
||||||
# Erasure code the dataset according to provided parameters
|
# Erasure code the dataset according to provided parameters
|
||||||
without encoded =? (await self.erasure.encode(manifest, nodes.int, tolerance.int)), error:
|
without encoded =? (await self.erasure.encode(manifest, nodes.int, tolerance.int)), error:
|
||||||
trace "Unable to erasure code dataset", cid = $cid
|
trace "Unable to erasure code dataset", cid
|
||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
without encodedData =? encoded.encode(), error:
|
without encodedData =? encoded.encode(), error:
|
||||||
@ -268,7 +268,7 @@ proc requestStorage*(self: CodexNodeRef,
|
|||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
if isErr (await self.blockStore.putBlock(encodedBlk)):
|
if isErr (await self.blockStore.putBlock(encodedBlk)):
|
||||||
trace "Unable to store encoded manifest block", cid = $encodedBlk.cid
|
trace "Unable to store encoded manifest block", cid = encodedBlk.cid
|
||||||
return failure("Unable to store encoded manifest block")
|
return failure("Unable to store encoded manifest block")
|
||||||
|
|
||||||
let request = StorageRequest(
|
let request = StorageRequest(
|
||||||
@ -334,14 +334,14 @@ proc start*(node: CodexNodeRef) {.async.} =
|
|||||||
##
|
##
|
||||||
|
|
||||||
without cid =? Cid.init(request.content.cid):
|
without cid =? Cid.init(request.content.cid):
|
||||||
trace "Unable to parse Cid", cid = $cid
|
trace "Unable to parse Cid", cid
|
||||||
raise newException(CodexError, "Unable to parse Cid")
|
raise newException(CodexError, "Unable to parse Cid")
|
||||||
|
|
||||||
without manifest =? await node.fetchManifest(cid), error:
|
without manifest =? await node.fetchManifest(cid), error:
|
||||||
trace "Unable to fetch manifest for cid", cid = $cid
|
trace "Unable to fetch manifest for cid", cid
|
||||||
raise error
|
raise error
|
||||||
|
|
||||||
trace "Fetching block for manifest", cid = $cid
|
trace "Fetching block for manifest", cid
|
||||||
# TODO: This will probably require a call to `getBlock` either way,
|
# TODO: This will probably require a call to `getBlock` either way,
|
||||||
# since fetching of blocks will have to be selective according
|
# since fetching of blocks will have to be selective according
|
||||||
# to a combination of parameters, such as node slot position
|
# to a combination of parameters, such as node slot position
|
||||||
|
@ -123,7 +123,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
|
|||||||
trace "Excepting streaming blocks", exc = exc.msg
|
trace "Excepting streaming blocks", exc = exc.msg
|
||||||
return RestApiResponse.error(Http500)
|
return RestApiResponse.error(Http500)
|
||||||
finally:
|
finally:
|
||||||
trace "Sent bytes", cid = $id.get(), bytes
|
trace "Sent bytes", cid = id.get(), bytes
|
||||||
if not stream.isNil:
|
if not stream.isNil:
|
||||||
await stream.close()
|
await stream.close()
|
||||||
|
|
||||||
|
@ -12,11 +12,11 @@ import pkg/upraises
|
|||||||
push: {.upraises: [].}
|
push: {.upraises: [].}
|
||||||
|
|
||||||
import pkg/libp2p/crypto/crypto
|
import pkg/libp2p/crypto/crypto
|
||||||
import pkg/bearssl/rand
|
import pkg/bearssl
|
||||||
|
|
||||||
type
|
type
|
||||||
RngSampleError = object of CatchableError
|
RngSampleError = object of CatchableError
|
||||||
Rng* = ref HmacDrbgContext
|
Rng* = ref BrHmacDrbgContext
|
||||||
|
|
||||||
var rng {.threadvar.}: Rng
|
var rng {.threadvar.}: Rng
|
||||||
|
|
||||||
@ -25,15 +25,16 @@ proc instance*(t: type Rng): Rng =
|
|||||||
rng = newRng()
|
rng = newRng()
|
||||||
rng
|
rng
|
||||||
|
|
||||||
# Random helpers: similar as in stdlib, but with HmacDrbgContext rng
|
# Random helpers: similar as in stdlib, but with BrHmacDrbgContext rng
|
||||||
# TODO: Move these somewhere else?
|
# TODO: Move these somewhere else?
|
||||||
const randMax = 18_446_744_073_709_551_615'u64
|
const randMax = 18_446_744_073_709_551_615'u64
|
||||||
|
|
||||||
proc rand*(rng: Rng, max: Natural): int =
|
proc rand*(rng: Rng, max: Natural): int =
|
||||||
if max == 0: return 0
|
if max == 0: return 0
|
||||||
|
|
||||||
|
var x: uint64
|
||||||
while true:
|
while true:
|
||||||
let x = rng[].generate(uint64)
|
brHmacDrbgGenerate(addr rng[], addr x, csize_t(sizeof(x)))
|
||||||
if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias
|
if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias
|
||||||
return int(x mod (uint64(max) + 1'u64))
|
return int(x mod (uint64(max) + 1'u64))
|
||||||
|
|
||||||
|
@ -63,7 +63,7 @@ proc uploadTags*(
|
|||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception submitting tags", cid = $cid, exc = exc.msg
|
trace "Exception submitting tags", cid, exc = exc.msg
|
||||||
return failure(exc.msg)
|
return failure(exc.msg)
|
||||||
finally:
|
finally:
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
@ -47,7 +47,7 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
|
|||||||
## Get a block from the stores
|
## Get a block from the stores
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Getting block from cache", cid = $cid
|
trace "Getting block from cache", cid
|
||||||
|
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
@ -59,14 +59,14 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
|
|||||||
try:
|
try:
|
||||||
return success self.cache[cid]
|
return success self.cache[cid]
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Error requesting block from cache", cid = $cid, error = exc.msg
|
trace "Error requesting block from cache", cid, error = exc.msg
|
||||||
return failure exc
|
return failure exc
|
||||||
|
|
||||||
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
|
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
|
||||||
## Check if the block exists in the blockstore
|
## Check if the block exists in the blockstore
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Checking CacheStore for block presence", cid = $cid
|
trace "Checking CacheStore for block presence", cid
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
return true.success
|
return true.success
|
||||||
@ -108,7 +108,7 @@ method putBlock*(self: CacheStore, blk: Block): Future[?!void] {.async.} =
|
|||||||
## Put a block to the blockstore
|
## Put a block to the blockstore
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Storing block in cache", cid = $blk.cid
|
trace "Storing block in cache", cid = blk.cid
|
||||||
if blk.isEmpty:
|
if blk.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
return success()
|
return success()
|
||||||
@ -120,7 +120,7 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
|||||||
## Delete a block from the blockstore
|
## Delete a block from the blockstore
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Deleting block from cache", cid = $cid
|
trace "Deleting block from cache", cid
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
return success()
|
return success()
|
||||||
|
@ -43,9 +43,9 @@ method getBlock*(self: FSStore, cid: Cid): Future[?!Block] {.async.} =
|
|||||||
##
|
##
|
||||||
|
|
||||||
if not self.cache.isNil:
|
if not self.cache.isNil:
|
||||||
trace "Getting block from cache or filestore", cid = $cid
|
trace "Getting block from cache or filestore", cid
|
||||||
else:
|
else:
|
||||||
trace "Getting block from filestore", cid = $cid
|
trace "Getting block from filestore", cid
|
||||||
|
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
@ -58,7 +58,7 @@ method getBlock*(self: FSStore, cid: Cid): Future[?!Block] {.async.} =
|
|||||||
if not cachedBlockRes.isErr:
|
if not cachedBlockRes.isErr:
|
||||||
return success cachedBlockRes.get
|
return success cachedBlockRes.get
|
||||||
else:
|
else:
|
||||||
trace "Unable to read block from cache", cid = $cid, error = cachedBlockRes.error.msg
|
trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg
|
||||||
|
|
||||||
# Read file contents
|
# Read file contents
|
||||||
var
|
var
|
||||||
@ -79,7 +79,7 @@ method getBlock*(self: FSStore, cid: Cid): Future[?!Block] {.async.} =
|
|||||||
return failure "Error requesting block from filestore: " & error
|
return failure "Error requesting block from filestore: " & error
|
||||||
|
|
||||||
without blk =? Block.new(cid, data), error:
|
without blk =? Block.new(cid, data), error:
|
||||||
trace "Unable to construct block from data", cid = $cid, error = error.msg
|
trace "Unable to construct block from data", cid, error = error.msg
|
||||||
return failure error
|
return failure error
|
||||||
|
|
||||||
if not self.cache.isNil:
|
if not self.cache.isNil:
|
||||||
@ -87,7 +87,7 @@ method getBlock*(self: FSStore, cid: Cid): Future[?!Block] {.async.} =
|
|||||||
putCachedRes = await self.cache.putBlock(blk)
|
putCachedRes = await self.cache.putBlock(blk)
|
||||||
|
|
||||||
if putCachedRes.isErr:
|
if putCachedRes.isErr:
|
||||||
trace "Unable to store block in cache", cid = $cid, error = putCachedRes.error.msg
|
trace "Unable to store block in cache", cid, error = putCachedRes.error.msg
|
||||||
|
|
||||||
return success blk
|
return success blk
|
||||||
|
|
||||||
@ -97,9 +97,9 @@ method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} =
|
|||||||
##
|
##
|
||||||
|
|
||||||
if not self.cache.isNil:
|
if not self.cache.isNil:
|
||||||
trace "Putting block into filestore and cache", cid = $blk.cid
|
trace "Putting block into filestore and cache", cid = blk.cid
|
||||||
else:
|
else:
|
||||||
trace "Putting block into filestore", cid = $blk.cid
|
trace "Putting block into filestore", cid = blk.cid
|
||||||
|
|
||||||
if blk.isEmpty:
|
if blk.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
@ -118,7 +118,7 @@ method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} =
|
|||||||
let res = io2.writeFile(path, blk.data)
|
let res = io2.writeFile(path, blk.data)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
let error = io2.ioErrorMsg(res.error)
|
let error = io2.ioErrorMsg(res.error)
|
||||||
trace "Unable to store block", path, cid = $blk.cid, error
|
trace "Unable to store block", path, cid = blk.cid, error
|
||||||
return failure("Unable to store block")
|
return failure("Unable to store block")
|
||||||
|
|
||||||
if not self.cache.isNil:
|
if not self.cache.isNil:
|
||||||
@ -126,7 +126,7 @@ method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} =
|
|||||||
putCachedRes = await self.cache.putBlock(blk)
|
putCachedRes = await self.cache.putBlock(blk)
|
||||||
|
|
||||||
if putCachedRes.isErr:
|
if putCachedRes.isErr:
|
||||||
trace "Unable to store block in cache", cid = $blk.cid, error = putCachedRes.error.msg
|
trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
@ -135,9 +135,9 @@ method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} =
|
|||||||
##
|
##
|
||||||
|
|
||||||
if not self.cache.isNil:
|
if not self.cache.isNil:
|
||||||
trace "Deleting block from cache and filestore", cid = $cid
|
trace "Deleting block from cache and filestore", cid
|
||||||
else:
|
else:
|
||||||
trace "Deleting block from filestore", cid = $cid
|
trace "Deleting block from filestore", cid
|
||||||
|
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
@ -148,7 +148,7 @@ method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} =
|
|||||||
delCachedRes = await self.cache.delBlock(cid)
|
delCachedRes = await self.cache.delBlock(cid)
|
||||||
|
|
||||||
if delCachedRes.isErr:
|
if delCachedRes.isErr:
|
||||||
trace "Unable to delete block from cache", cid = $cid, error = delCachedRes.error.msg
|
trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg
|
||||||
|
|
||||||
let
|
let
|
||||||
path = self.blockPath(cid)
|
path = self.blockPath(cid)
|
||||||
@ -156,7 +156,7 @@ method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} =
|
|||||||
|
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
let error = io2.ioErrorMsg(res.error)
|
let error = io2.ioErrorMsg(res.error)
|
||||||
trace "Unable to delete block", path, cid = $cid, error
|
trace "Unable to delete block", path, cid, error
|
||||||
return error.failure
|
return error.failure
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
@ -165,7 +165,7 @@ method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} =
|
|||||||
## Check if a block exists in the filestore
|
## Check if a block exists in the filestore
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Checking filestore for block existence", cid = $cid
|
trace "Checking filestore for block existence", cid
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
return true.success
|
return true.success
|
||||||
|
@ -35,11 +35,11 @@ method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} =
|
|||||||
## Get a block from a remote peer
|
## Get a block from a remote peer
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Getting block from local store or network", cid = $cid
|
trace "Getting block from local store or network", cid
|
||||||
|
|
||||||
without blk =? await self.localStore.getBlock(cid), error:
|
without blk =? await self.localStore.getBlock(cid), error:
|
||||||
if not (error of BlockNotFoundError): return failure error
|
if not (error of BlockNotFoundError): return failure error
|
||||||
trace "Block not in local store", cid = $cid
|
trace "Block not in local store", cid
|
||||||
# TODO: What if block isn't available in the engine too?
|
# TODO: What if block isn't available in the engine too?
|
||||||
# TODO: add retrieved block to the local store
|
# TODO: add retrieved block to the local store
|
||||||
return (await self.engine.requestBlock(cid)).catch
|
return (await self.engine.requestBlock(cid)).catch
|
||||||
@ -50,7 +50,7 @@ method putBlock*(self: NetworkStore, blk: bt.Block): Future[?!void] {.async.} =
|
|||||||
## Store block locally and notify the network
|
## Store block locally and notify the network
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Puting block into network store", cid = $blk.cid
|
trace "Puting block into network store", cid = blk.cid
|
||||||
|
|
||||||
let res = await self.localStore.putBlock(blk)
|
let res = await self.localStore.putBlock(blk)
|
||||||
if res.isErr:
|
if res.isErr:
|
||||||
@ -63,7 +63,7 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
|||||||
## Delete a block from the blockstore
|
## Delete a block from the blockstore
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Deleting block from network store", cid = $cid
|
trace "Deleting block from network store", cid
|
||||||
return self.localStore.delBlock(cid)
|
return self.localStore.delBlock(cid)
|
||||||
|
|
||||||
{.pop.}
|
{.pop.}
|
||||||
@ -72,7 +72,7 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
|
|||||||
## Check if the block exists in the blockstore
|
## Check if the block exists in the blockstore
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "Checking network store for block existence", cid = $cid
|
trace "Checking network store for block existence", cid
|
||||||
return await self.localStore.hasBlock(cid)
|
return await self.localStore.hasBlock(cid)
|
||||||
|
|
||||||
method close*(self: NetworkStore): Future[void] {.async.} =
|
method close*(self: NetworkStore): Future[void] {.async.} =
|
||||||
|
@ -7,8 +7,7 @@
|
|||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import pkg/upraises
|
{.push raises: [Defect].}
|
||||||
push: {.upraises: [].}
|
|
||||||
|
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
|
@ -69,7 +69,7 @@ method readOnce*(
|
|||||||
## Return how many bytes were actually read before EOF was encountered.
|
## Return how many bytes were actually read before EOF was encountered.
|
||||||
## Raise exception if we are already at EOF.
|
## Raise exception if we are already at EOF.
|
||||||
|
|
||||||
trace "Reading from manifest", cid = $self.manifest.cid.get(), blocks = self.manifest.len
|
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len
|
||||||
if self.atEof:
|
if self.atEof:
|
||||||
raise newLPStreamEOFError()
|
raise newLPStreamEOFError()
|
||||||
|
|
||||||
@ -88,7 +88,7 @@ method readOnce*(
|
|||||||
without blk =? await self.store.getBlock(self.manifest[blockNum]), error:
|
without blk =? await self.store.getBlock(self.manifest[blockNum]), error:
|
||||||
raise newLPStreamReadError(error)
|
raise newLPStreamReadError(error)
|
||||||
|
|
||||||
trace "Reading bytes from store stream", blockNum, cid = $blk.cid, bytes = readBytes, blockOffset
|
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
||||||
|
|
||||||
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
||||||
if blk.isEmpty:
|
if blk.isEmpty:
|
||||||
|
@ -158,6 +158,9 @@ suite "Network - Senders":
|
|||||||
done = newFuture[void]()
|
done = newFuture[void]()
|
||||||
switch1 = newStandardSwitch()
|
switch1 = newStandardSwitch()
|
||||||
switch2 = newStandardSwitch()
|
switch2 = newStandardSwitch()
|
||||||
|
await switch1.start()
|
||||||
|
await switch2.start()
|
||||||
|
|
||||||
network1 = BlockExcNetwork.new(
|
network1 = BlockExcNetwork.new(
|
||||||
switch = switch1)
|
switch = switch1)
|
||||||
switch1.mount(network1)
|
switch1.mount(network1)
|
||||||
@ -166,9 +169,6 @@ suite "Network - Senders":
|
|||||||
switch = switch2)
|
switch = switch2)
|
||||||
switch2.mount(network2)
|
switch2.mount(network2)
|
||||||
|
|
||||||
await switch1.start()
|
|
||||||
await switch2.start()
|
|
||||||
|
|
||||||
await switch1.connect(
|
await switch1.connect(
|
||||||
switch2.peerInfo.peerId,
|
switch2.peerInfo.peerId,
|
||||||
switch2.peerInfo.addrs)
|
switch2.peerInfo.addrs)
|
||||||
@ -271,6 +271,8 @@ suite "Network - Test Limits":
|
|||||||
done = newFuture[void]()
|
done = newFuture[void]()
|
||||||
switch1 = newStandardSwitch()
|
switch1 = newStandardSwitch()
|
||||||
switch2 = newStandardSwitch()
|
switch2 = newStandardSwitch()
|
||||||
|
await switch1.start()
|
||||||
|
await switch2.start()
|
||||||
|
|
||||||
network1 = BlockExcNetwork.new(
|
network1 = BlockExcNetwork.new(
|
||||||
switch = switch1,
|
switch = switch1,
|
||||||
@ -281,9 +283,6 @@ suite "Network - Test Limits":
|
|||||||
switch = switch2)
|
switch = switch2)
|
||||||
switch2.mount(network2)
|
switch2.mount(network2)
|
||||||
|
|
||||||
await switch1.start()
|
|
||||||
await switch2.start()
|
|
||||||
|
|
||||||
await switch1.connect(
|
await switch1.connect(
|
||||||
switch2.peerInfo.peerId,
|
switch2.peerInfo.peerId,
|
||||||
switch2.peerInfo.addrs)
|
switch2.peerInfo.addrs)
|
||||||
|
@ -37,11 +37,12 @@ proc corruptBlocks*(
|
|||||||
manifest: Manifest,
|
manifest: Manifest,
|
||||||
blks, bytes: int): Future[seq[int]] {.async.} =
|
blks, bytes: int): Future[seq[int]] {.async.} =
|
||||||
var pos: seq[int]
|
var pos: seq[int]
|
||||||
|
while true:
|
||||||
|
if pos.len >= blks:
|
||||||
|
break
|
||||||
|
|
||||||
doAssert blks < manifest.len
|
var i = -1
|
||||||
while pos.len < blks:
|
if (i = Rng.instance.rand(manifest.len - 1); pos.find(i) >= 0):
|
||||||
let i = Rng.instance.rand(manifest.len - 1)
|
|
||||||
if pos.find(i) >= 0:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
pos.add(i)
|
pos.add(i)
|
||||||
@ -49,12 +50,15 @@ proc corruptBlocks*(
|
|||||||
blk = (await store.getBlock(manifest[i])).tryGet()
|
blk = (await store.getBlock(manifest[i])).tryGet()
|
||||||
bytePos: seq[int]
|
bytePos: seq[int]
|
||||||
|
|
||||||
doAssert bytes < blk.data.len
|
while true:
|
||||||
while bytePos.len <= bytes:
|
if bytePos.len > bytes:
|
||||||
let ii = Rng.instance.rand(blk.data.len - 1)
|
break
|
||||||
if bytePos.find(ii) >= 0:
|
|
||||||
|
var ii = -1
|
||||||
|
if (ii = Rng.instance.rand(blk.data.len - 1); bytePos.find(ii) >= 0):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
bytePos.add(ii)
|
bytePos.add(ii)
|
||||||
blk.data[ii] = byte 0
|
blk.data[ii] = byte 0
|
||||||
|
|
||||||
return pos
|
return pos
|
||||||
|
2
vendor/nim-bearssl
vendored
2
vendor/nim-bearssl
vendored
@ -1 +1 @@
|
|||||||
Subproject commit f4c4233de453cb7eac0ce3f3ffad6496295f83ab
|
Subproject commit dc62f4fccd2d40c884009ae8f2b14bb6a86a55cf
|
2
vendor/nim-chronicles
vendored
2
vendor/nim-chronicles
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 7631f7b2ee03398cb1512a79923264e8f9410af6
|
Subproject commit 1682096306ddba8185dcfac360a8c3f952d721e4
|
2
vendor/nim-http-utils
vendored
2
vendor/nim-http-utils
vendored
@ -1 +1 @@
|
|||||||
Subproject commit e88e231dfcef4585fe3b2fbd9b664dbd28a88040
|
Subproject commit 689da19e9e9cfff4ced85e2b25c6b2b5598ed079
|
2
vendor/nim-json-serialization
vendored
2
vendor/nim-json-serialization
vendored
@ -1 +1 @@
|
|||||||
Subproject commit e5b18fb710c3d0167ec79f3b892f5a7a1bc6d1a4
|
Subproject commit c5f0e2465e8375dfc7aa0f56ccef67cb680bc6b0
|
2
vendor/nim-presto
vendored
2
vendor/nim-presto
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 3984431dc0fc829eb668e12e57e90542b041d298
|
Subproject commit 962bb588d19c7180e39f0d9f18131e75861bab20
|
2
vendor/nim-serialization
vendored
2
vendor/nim-serialization
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 493d18b8292fc03aa4f835fd825dea1183f97466
|
Subproject commit fcd0eadadde0ee000a63df8ab21dc4e9f015a790
|
2
vendor/nim-stew
vendored
2
vendor/nim-stew
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 0c379cf1d8d3d9db07af108cc78ff542b2105914
|
Subproject commit 6ad35b876fb6ebe0dfee0f697af173acc47906ee
|
2
vendor/nim-websock
vendored
2
vendor/nim-websock
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 7b2ed397d6e4c37ea4df08ae82aeac7ff04cd180
|
Subproject commit a697e3585d583ab6b91a159ea7d023461002c927
|
Loading…
x
Reference in New Issue
Block a user