Integrate erasure (#73)

* wip: adding request for storage endpoint

* wire in erasure coding

* fix tests for erasure coding

* put type definitions into separate file

* integrate erasure coding

* change run/shutdown to start/stop

* temporary sleep, otherwise the fsstore blocks
This commit is contained in:
Dmitriy Ryajov 2022-04-05 18:34:29 -06:00 committed by GitHub
parent 22c6705312
commit ffa9b624f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 269 additions and 64 deletions

View File

@ -57,7 +57,7 @@ when isMainModule:
setupForeignThreadGc() setupForeignThreadGc()
except Exception as exc: raiseAssert exc.msg # shouldn't happen except Exception as exc: raiseAssert exc.msg # shouldn't happen
notice "Shutting down after having received SIGINT" notice "Shutting down after having received SIGINT"
waitFor server.shutdown() waitFor server.stop()
try: try:
setControlCHook(controlCHandler) setControlCHook(controlCHandler)
@ -68,10 +68,10 @@ when isMainModule:
when defined(posix): when defined(posix):
proc SIGTERMHandler(signal: cint) {.noconv.} = proc SIGTERMHandler(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM" notice "Shutting down after having received SIGTERM"
waitFor server.shutdown() waitFor server.stop()
c_signal(SIGTERM, SIGTERMHandler) c_signal(SIGTERM, SIGTERMHandler)
waitFor server.run() waitFor server.start()
of StartUpCommand.initNode: of StartUpCommand.initNode:
discard discard

View File

@ -26,6 +26,7 @@ import ./rest/api
import ./stores import ./stores
import ./blockexchange import ./blockexchange
import ./utils/fileutils import ./utils/fileutils
import ./erasure
type type
DaggerServer* = ref object DaggerServer* = ref object
@ -34,14 +35,14 @@ type
restServer: RestServerRef restServer: RestServerRef
daggerNode: DaggerNodeRef daggerNode: DaggerNodeRef
proc run*(s: DaggerServer) {.async.} = proc start*(s: DaggerServer) {.async.} =
s.restServer.start() s.restServer.start()
await s.daggerNode.start() await s.daggerNode.start()
s.runHandle = newFuture[void]() s.runHandle = newFuture[void]()
await s.runHandle await s.runHandle
proc shutdown*(s: DaggerServer) {.async.} = proc stop*(s: DaggerServer) {.async.} =
await allFuturesThrowing( await allFuturesThrowing(
s.restServer.stop(), s.daggerNode.stop()) s.restServer.stop(), s.daggerNode.stop())
@ -73,7 +74,8 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
localStore = FSStore.new(config.dataDir / "repo", cache = cache) localStore = FSStore.new(config.dataDir / "repo", cache = cache)
engine = BlockExcEngine.new(localStore, wallet, network) engine = BlockExcEngine.new(localStore, wallet, network)
store = NetworkStore.new(engine, localStore) store = NetworkStore.new(engine, localStore)
daggerNode = DaggerNodeRef.new(switch, store, engine) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
daggerNode = DaggerNodeRef.new(switch, store, engine, erasure)
restServer = RestServerRef.new( restServer = RestServerRef.new(
daggerNode.initRestApi(), daggerNode.initRestApi(),
initTAddress("127.0.0.1" , config.apiPort), initTAddress("127.0.0.1" , config.apiPort),

View File

@ -106,6 +106,11 @@ proc encode*(
dataBlocks = await allFinished( dataBlocks = await allFinished(
blockIdx.mapIt( self.store.getBlock(encoded[it]) )) blockIdx.mapIt( self.store.getBlock(encoded[it]) ))
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
for j in 0..<blocks: for j in 0..<blocks:
let idx = blockIdx[j] let idx = blockIdx[j]
if idx < manifest.len: if idx < manifest.len:
@ -177,6 +182,11 @@ proc decode*(
self.store.getBlock(encoded[it]) # Get the data blocks (first K) self.store.getBlock(encoded[it]) # Get the data blocks (first K)
) )
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
var var
data = newSeq[seq[byte]](encoded.K) # number of blocks to encode data = newSeq[seq[byte]](encoded.K) # number of blocks to encode
parityData = newSeq[seq[byte]](encoded.M) parityData = newSeq[seq[byte]](encoded.M)

View File

@ -1,4 +1,5 @@
import ./manifest/coders import ./manifest/coders
import ./manifest/manifest import ./manifest/manifest
import ./manifest/types
export manifest, coders export types, manifest, coders

View File

@ -21,19 +21,7 @@ import pkg/chronos
import ./manifest import ./manifest
import ../errors import ../errors
import ./types
const
DagPBCodec* = multiCodec("dag-pb")
type
ManifestCoderType*[codec: static MultiCodec] = object
DagPBCoder* = ManifestCoderType[multiCodec("dag-pb")]
const
# TODO: move somewhere better?
ManifestContainers* = {
$DagPBCodec: DagPBCoder()
}.toTable
func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
## Encode the manifest into a ``ManifestCodec`` ## Encode the manifest into a ``ManifestCodec``

View File

@ -19,23 +19,8 @@ import pkg/chronicles
import ../errors import ../errors
import ../blocktype import ../blocktype
import ./types
type import ./coders
Manifest* = ref object of RootObj
rootHash*: ?Cid # root (tree) hash of the contained data set
blockSize*: int # size of each contained block (might not be needed if blocks are len-prefixed)
blocks*: seq[Cid] # block Cid
version*: CidVersion # Cid version
hcodec*: MultiCodec # Multihash codec
codec*: MultiCodec # Data set codec
case protected*: bool # Protected datasets have erasure coded info
of true:
K*: int # Number of blocks to encode
M*: int # Number of resulting parity blocks
originalCid*: Cid # The original Cid of the dataset being erasure coded
originalLen*: int # The length of the original manifest
else:
discard
func len*(self: Manifest): int = func len*(self: Manifest): int =
self.blocks.len self.blocks.len
@ -189,5 +174,6 @@ proc new*(
proc new*( proc new*(
T: type Manifest, T: type Manifest,
data: openArray[byte]): ?!T = data: openArray[byte],
Manifest.decode(data) decoder = ManifestContainers[$DagPBCodec]): ?!T =
Manifest.decode(data, decoder)

42
dagger/manifest/types.nim Normal file
View File

@ -0,0 +1,42 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/tables
import pkg/libp2p
import pkg/questionable
const
DagPBCodec* = multiCodec("dag-pb")
type
ManifestCoderType*[codec: static MultiCodec] = object
DagPBCoder* = ManifestCoderType[multiCodec("dag-pb")]
const
# TODO: move somewhere better?
ManifestContainers* = {
$DagPBCodec: DagPBCoder()
}.toTable
type
Manifest* = ref object of RootObj
rootHash*: ?Cid # root (tree) hash of the contained data set
blockSize*: int # size of each contained block (might not be needed if blocks are len-prefixed)
blocks*: seq[Cid] # block Cid
version*: CidVersion # Cid version
hcodec*: MultiCodec # Multihash codec
codec*: MultiCodec # Data set codec
case protected*: bool # Protected datasets have erasure coded info
of true:
K*: int # Number of blocks to encode
M*: int # Number of resulting parity blocks
originalCid*: Cid # The original Cid of the dataset being erasure coded
originalLen*: int # The length of the original manifest
else:
discard

View File

@ -26,6 +26,7 @@ import ./manifest
import ./stores/blockstore import ./stores/blockstore
import ./blockexchange import ./blockexchange
import ./streams import ./streams
import ./erasure
logScope: logScope:
topics = "dagger node" topics = "dagger node"
@ -38,21 +39,22 @@ type
networkId*: PeerID networkId*: PeerID
blockStore*: BlockStore blockStore*: BlockStore
engine*: BlockExcEngine engine*: BlockExcEngine
erasure*: Erasure
proc start*(node: DaggerNodeRef) {.async.} = proc start*(node: DaggerNodeRef) {.async.} =
await node.switch.start() await node.switch.start()
await node.engine.start() await node.engine.start()
await node.erasure.start()
node.networkId = node.switch.peerInfo.peerId node.networkId = node.switch.peerInfo.peerId
notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
proc stop*(node: DaggerNodeRef) {.async.} = proc stop*(node: DaggerNodeRef) {.async.} =
trace "Stopping node" trace "Stopping node"
if not node.engine.isNil:
await node.engine.stop() await node.engine.stop()
if not node.switch.isNil:
await node.switch.stop() await node.switch.stop()
await node.erasure.stop()
proc findPeer*( proc findPeer*(
node: DaggerNodeRef, node: DaggerNodeRef,
@ -85,6 +87,16 @@ proc retrieve*(
without manifest =? Manifest.decode(blk.data, ManifestContainers[$mc]): without manifest =? Manifest.decode(blk.data, ManifestContainers[$mc]):
return failure("Unable to construct manifest!") return failure("Unable to construct manifest!")
if manifest.protected:
proc erasureJob(): Future[void] {.async.} =
try:
without res =? (await node.erasure.decode(manifest)), error: # spawn an erasure decoding job
trace "Unable to erasure decode manigest", cid, exc = error.msg
except CatchableError as exc:
trace "Exception decoding manifest", cid
asyncSpawn erasureJob()
return LPStream(StoreStream.new(node.blockStore, manifest)).success return LPStream(StoreStream.new(node.blockStore, manifest)).success
let let
@ -158,12 +170,69 @@ proc store*(
return manifest.cid.success return manifest.cid.success
proc requestStorage*(
self: DaggerNodeRef,
cid: Cid,
ppb: uint,
duration: Duration,
nodes: uint,
tolerance: uint,
autoRenew: bool = false): Future[?!Cid] {.async.} =
## Initiate a request for storage sequence, this might
## be a multistep procedure.
##
## Roughly the flow is as follows:
## - Get the original cid from the store (should have already been uploaded)
## - Erasure code it according to the nodes and tolerance parameters
## - Run the PoR setup on the erasure dataset
## - Call into the marketplace and purchasing contracts
##
trace "Received a request for storage!", cid, ppb, duration, nodes, tolerance, autoRenew
without blk =? (await self.blockStore.getBlock(cid)), error:
trace "Unable to retrieve manifest block", cid
return failure(error)
without mc =? blk.cid.contentType():
trace "Couldn't identify Cid!", cid
return failure("Couldn't identify Cid! " & $cid)
# if we got a manifest, stream the blocks
if $mc notin ManifestContainers:
trace "Not a manifest type!", cid, mc
return failure("Not a manifest type!")
without var manifest =? Manifest.decode(blk.data), error:
trace "Unable to decode manifest from block", cid
return failure(error)
# Erasure code the dataset according to provided parameters
without encoded =? (await self.erasure.encode(manifest, nodes.int, tolerance.int)), error:
trace "Unable to erasure code dataset", cid
return failure(error)
without encodedData =? encoded.encode(), error:
trace "Unable to encode protected manifest"
return failure(error)
without encodedBlk =? bt.Block.new(data = encodedData, codec = DagPBCodec), error:
trace "Unable to create block from encoded manifest"
return failure(error)
if not (await self.blockStore.putBlock(encodedBlk)):
trace "Unable to store encoded manifest block", cid = encodedBlk.cid
return failure("Unable to store encoded manifest block")
return encodedBlk.cid.success
proc new*( proc new*(
T: type DaggerNodeRef, T: type DaggerNodeRef,
switch: Switch, switch: Switch,
store: BlockStore, store: BlockStore,
engine: BlockExcEngine): T = engine: BlockExcEngine,
erasure: Erasure): T =
T( T(
switch: switch, switch: switch,
blockStore: store, blockStore: store,
engine: engine) engine: engine,
erasure: erasure)

View File

@ -20,10 +20,12 @@ import pkg/chronicles
import pkg/chronos import pkg/chronos
import pkg/presto import pkg/presto
import pkg/libp2p import pkg/libp2p
import pkg/stew/base10
import pkg/libp2p/routing_record import pkg/libp2p/routing_record
import ../node import ../node
import ../blocktype
proc validate( proc validate(
pattern: string, pattern: string,
@ -35,7 +37,8 @@ proc encodeString(cid: type Cid): Result[string, cstring] =
ok($cid) ok($cid)
proc decodeString(T: type Cid, value: string): Result[Cid, cstring] = proc decodeString(T: type Cid, value: string): Result[Cid, cstring] =
Cid.init(value) Cid
.init(value)
.mapErr do(e: CidError) -> cstring: .mapErr do(e: CidError) -> cstring:
case e case e
of CidError.Incorrect: "Incorrect Cid" of CidError.Incorrect: "Incorrect Cid"
@ -57,6 +60,29 @@ proc decodeString(T: type MultiAddress, value: string): Result[MultiAddress, cst
.init(value) .init(value)
.mapErr do(e: string) -> cstring: cstring(e) .mapErr do(e: string) -> cstring: cstring(e)
proc decodeString(T: type SomeUnsignedInt, value: string): Result[T, cstring] =
Base10.decode(T, value)
proc encodeString(value: SomeUnsignedInt): Result[string, cstring] =
ok(Base10.toString(value))
proc decodeString(T: type Duration, value: string): Result[T, cstring] =
let v = ? Base10.decode(uint32, value)
ok(v.minutes)
proc encodeString(value: Duration): Result[string, cstring] =
ok($value)
proc decodeString(T: type bool, value: string): Result[T, cstring] =
try:
ok(value.parseBool())
except CatchableError as exc:
let s: cstring = exc.msg
err(s) # err(exc.msg) won't compile
proc encodeString(value: bool): Result[string, cstring] =
ok($value)
proc initRestApi*(node: DaggerNodeRef): RestRouter = proc initRestApi*(node: DaggerNodeRef): RestRouter =
var router = RestRouter.init(validate) var router = RestRouter.init(validate)
router.api( router.api(
@ -97,7 +123,6 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
except CatchableError as e: except CatchableError as e:
return RestApiResponse.error(Http400, "Unknown error dialling peer") return RestApiResponse.error(Http400, "Unknown error dialling peer")
router.api( router.api(
MethodGet, MethodGet,
"/api/dagger/v1/download/{id}") do ( "/api/dagger/v1/download/{id}") do (
@ -116,18 +141,15 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
var bytes = 0 var bytes = 0
try: try:
if ( without stream =? (await node.retrieve(id.get())), error:
let retr = await node.retrieve(id.get()); return RestApiResponse.error(Http404, error.msg)
retr.isErr):
return RestApiResponse.error(Http404, retr.error.msg)
resp.addHeader("Content-Type", "application/octet-stream") resp.addHeader("Content-Type", "application/octet-stream")
await resp.prepareChunked() await resp.prepareChunked()
stream = retr.get()
while not stream.atEof: while not stream.atEof:
var var
buff = newSeqUninitialized[byte](FileChunkSize) buff = newSeqUninitialized[byte](BlockSize)
len = await stream.readOnce(addr buff[0], buff.len) len = await stream.readOnce(addr buff[0], buff.len)
buff.setLen(len) buff.setLen(len)
@ -146,6 +168,91 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
if not stream.isNil: if not stream.isNil:
await stream.close() await stream.close()
router.api(
MethodPost,
"/api/dagger/v1/storage/request/{cid}") do (
cid: Cid,
ppb: Option[uint],
duration: Option[Duration],
nodes: Option[uint],
loss: Option[uint],
renew: Option[bool]) -> RestApiResponse:
## Create a request for storage
##
## Cid - the cid of the previously uploaded dataset
## ppb - the price per byte the client is willing to pay
## duration - the duration of the contract
## nodeCount - the total amount of the nodes storing the dataset, including `lossTolerance`
## lossTolerance - the number of nodes losses the user is willing to tolerate
## autoRenew - should the contract be autorenewed -
## will fail unless the user has enough funds lockedup
##
var
cid =
if cid.isErr:
return RestApiResponse.error(Http400, $cid.error())
else:
cid.get()
ppb =
if ppb.isNone:
return RestApiResponse.error(Http400, "Missing ppb")
else:
if ppb.get().isErr:
return RestApiResponse.error(Http500, $ppb.get().error)
else:
ppb.get().get()
duration =
if duration.isNone:
return RestApiResponse.error(Http400, "Missing duration")
else:
if duration.get().isErr:
return RestApiResponse.error(Http500, $duration.get().error)
else:
duration.get().get()
nodes =
if nodes.isNone:
return RestApiResponse.error(Http400, "Missing node count")
else:
if nodes.get().isErr:
return RestApiResponse.error(Http500, $nodes.get().error)
else:
nodes.get().get()
loss =
if loss.isNone:
return RestApiResponse.error(Http400, "Missing loss tolerance")
else:
if loss.get().isErr:
return RestApiResponse.error(Http500, $loss.get().error)
else:
loss.get().get()
renew = if renew.isNone:
false
else:
if renew.get().isErr:
return RestApiResponse.error(Http500, $renew.get().error)
else:
renew.get().get()
try:
without storageCid =? (await node.requestStorage(
cid,
ppb,
duration,
nodes,
loss,
renew)), error:
return RestApiResponse.error(Http500, error.msg)
return RestApiResponse.response($storageCid)
except CatchableError as exc:
return RestApiResponse.error(Http500, exc.msg)
router.rawApi( router.rawApi(
MethodPost, MethodPost,
"/api/dagger/v1/upload") do ( "/api/dagger/v1/upload") do (
@ -173,7 +280,7 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
try: try:
while not reader.atEof: while not reader.atEof:
var var
buff = newSeqUninitialized[byte](FileChunkSize) buff = newSeqUninitialized[byte](BlockSize)
len = await reader.readOnce(addr buff[0], buff.len) len = await reader.readOnce(addr buff[0], buff.len)
buff.setLen(len) buff.setLen(len)
@ -185,8 +292,8 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
bytes += len bytes += len
await stream.pushEof() await stream.pushEof()
without cid =? (await storeFut): without cid =? (await storeFut), error:
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500, error.msg)
trace "Uploaded file", bytes, cid = $cid trace "Uploaded file", bytes, cid = $cid
return RestApiResponse.response($cid) return RestApiResponse.response($cid)

View File

@ -42,7 +42,7 @@ suite "Test Node":
localStore = CacheStore.new() localStore = CacheStore.new()
engine = BlockExcEngine.new(localStore, wallet, network) engine = BlockExcEngine.new(localStore, wallet, network)
store = NetworkStore.new(engine, localStore) store = NetworkStore.new(engine, localStore)
node = DaggerNodeRef.new(switch, store, engine) node = DaggerNodeRef.new(switch, store, engine, nil) # TODO: pass `Erasure`
await node.start() await node.start()