From ffa9b624f12250f280a217a4af36cdabaa035439 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 5 Apr 2022 18:34:29 -0600 Subject: [PATCH] Integrate erasure (#73) * wip: adding request for storage endpoint * wire in erasure coding * fix tests for erasure coding * put type definitions into separate file * integrate erasure coding * change run/shutdown to start/stop * temporary sleep, otherwise the fsstore blocks --- dagger.nim | 6 +- dagger/dagger.nim | 8 +- dagger/erasure/erasure.nim | 10 +++ dagger/manifest.nim | 3 +- dagger/manifest/coders.nim | 14 +--- dagger/manifest/manifest.nim | 24 ++---- dagger/manifest/types.nim | 42 +++++++++++ dagger/node.nim | 83 +++++++++++++++++++-- dagger/rest/api.nim | 141 ++++++++++++++++++++++++++++++----- tests/dagger/testnode.nim | 2 +- 10 files changed, 269 insertions(+), 64 deletions(-) create mode 100644 dagger/manifest/types.nim diff --git a/dagger.nim b/dagger.nim index 25c4e1fa..edbf2b6f 100644 --- a/dagger.nim +++ b/dagger.nim @@ -57,7 +57,7 @@ when isMainModule: setupForeignThreadGc() except Exception as exc: raiseAssert exc.msg # shouldn't happen notice "Shutting down after having received SIGINT" - waitFor server.shutdown() + waitFor server.stop() try: setControlCHook(controlCHandler) @@ -68,10 +68,10 @@ when isMainModule: when defined(posix): proc SIGTERMHandler(signal: cint) {.noconv.} = notice "Shutting down after having received SIGTERM" - waitFor server.shutdown() + waitFor server.stop() c_signal(SIGTERM, SIGTERMHandler) - waitFor server.run() + waitFor server.start() of StartUpCommand.initNode: discard diff --git a/dagger/dagger.nim b/dagger/dagger.nim index b46ba7f2..83a96307 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -26,6 +26,7 @@ import ./rest/api import ./stores import ./blockexchange import ./utils/fileutils +import ./erasure type DaggerServer* = ref object @@ -34,14 +35,14 @@ type restServer: RestServerRef daggerNode: DaggerNodeRef -proc run*(s: DaggerServer) {.async.} = +proc start*(s: DaggerServer) {.async.} = s.restServer.start() await s.daggerNode.start() s.runHandle = newFuture[void]() await s.runHandle -proc shutdown*(s: DaggerServer) {.async.} = +proc stop*(s: DaggerServer) {.async.} = await allFuturesThrowing( s.restServer.stop(), s.daggerNode.stop()) @@ -73,7 +74,8 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = localStore = FSStore.new(config.dataDir / "repo", cache = cache) engine = BlockExcEngine.new(localStore, wallet, network) store = NetworkStore.new(engine, localStore) - daggerNode = DaggerNodeRef.new(switch, store, engine) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + daggerNode = DaggerNodeRef.new(switch, store, engine, erasure) restServer = RestServerRef.new( daggerNode.initRestApi(), initTAddress("127.0.0.1" , config.apiPort), diff --git a/dagger/erasure/erasure.nim b/dagger/erasure/erasure.nim index a8535036..bb8970c4 100644 --- a/dagger/erasure/erasure.nim +++ b/dagger/erasure/erasure.nim @@ -106,6 +106,11 @@ proc encode*( dataBlocks = await allFinished( blockIdx.mapIt( self.store.getBlock(encoded[it]) )) + # TODO: this is a tight blocking loop so we sleep here to allow + # other events to be processed, this should be addressed + # by threading + await sleepAsync(10.millis) + for j in 0.. cstring: - case e - of CidError.Incorrect: "Incorrect Cid" - of CidError.Unsupported: "Unsupported Cid" - of CidError.Overrun: "Overrun Cid" - else: "Error parsing Cid" + Cid + .init(value) + .mapErr do(e: CidError) -> cstring: + case e + of CidError.Incorrect: "Incorrect Cid" + of CidError.Unsupported: "Unsupported Cid" + of CidError.Overrun: "Overrun Cid" + else: "Error parsing Cid" proc encodeString(peerId: PeerID): Result[string, cstring] = ok($peerId) @@ -57,6 +60,29 @@ proc decodeString(T: type MultiAddress, value: string): Result[MultiAddress, cst .init(value) .mapErr do(e: string) -> cstring: cstring(e) +proc decodeString(T: type SomeUnsignedInt, value: string): Result[T, cstring] = + Base10.decode(T, value) + +proc encodeString(value: SomeUnsignedInt): Result[string, cstring] = + ok(Base10.toString(value)) + +proc decodeString(T: type Duration, value: string): Result[T, cstring] = + let v = ? Base10.decode(uint32, value) + ok(v.minutes) + +proc encodeString(value: Duration): Result[string, cstring] = + ok($value) + +proc decodeString(T: type bool, value: string): Result[T, cstring] = + try: + ok(value.parseBool()) + except CatchableError as exc: + let s: cstring = exc.msg + err(s) # err(exc.msg) won't compile + +proc encodeString(value: bool): Result[string, cstring] = + ok($value) + proc initRestApi*(node: DaggerNodeRef): RestRouter = var router = RestRouter.init(validate) router.api( @@ -97,7 +123,6 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = except CatchableError as e: return RestApiResponse.error(Http400, "Unknown error dialling peer") - router.api( MethodGet, "/api/dagger/v1/download/{id}") do ( @@ -116,18 +141,15 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = var bytes = 0 try: - if ( - let retr = await node.retrieve(id.get()); - retr.isErr): - return RestApiResponse.error(Http404, retr.error.msg) + without stream =? (await node.retrieve(id.get())), error: + return RestApiResponse.error(Http404, error.msg) resp.addHeader("Content-Type", "application/octet-stream") await resp.prepareChunked() - stream = retr.get() while not stream.atEof: var - buff = newSeqUninitialized[byte](FileChunkSize) + buff = newSeqUninitialized[byte](BlockSize) len = await stream.readOnce(addr buff[0], buff.len) buff.setLen(len) @@ -146,6 +168,91 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = if not stream.isNil: await stream.close() + router.api( + MethodPost, + "/api/dagger/v1/storage/request/{cid}") do ( + cid: Cid, + ppb: Option[uint], + duration: Option[Duration], + nodes: Option[uint], + loss: Option[uint], + renew: Option[bool]) -> RestApiResponse: + ## Create a request for storage + ## + ## Cid - the cid of the previously uploaded dataset + ## ppb - the price per byte the client is willing to pay + ## duration - the duration of the contract + ## nodeCount - the total amount of the nodes storing the dataset, including `lossTolerance` + ## lossTolerance - the number of nodes losses the user is willing to tolerate + ## autoRenew - should the contract be autorenewed - + ## will fail unless the user has enough funds lockedup + ## + + var + cid = + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error()) + else: + cid.get() + + ppb = + if ppb.isNone: + return RestApiResponse.error(Http400, "Missing ppb") + else: + if ppb.get().isErr: + return RestApiResponse.error(Http500, $ppb.get().error) + else: + ppb.get().get() + + duration = + if duration.isNone: + return RestApiResponse.error(Http400, "Missing duration") + else: + if duration.get().isErr: + return RestApiResponse.error(Http500, $duration.get().error) + else: + duration.get().get() + + nodes = + if nodes.isNone: + return RestApiResponse.error(Http400, "Missing node count") + else: + if nodes.get().isErr: + return RestApiResponse.error(Http500, $nodes.get().error) + else: + nodes.get().get() + + loss = + if loss.isNone: + return RestApiResponse.error(Http400, "Missing loss tolerance") + else: + if loss.get().isErr: + return RestApiResponse.error(Http500, $loss.get().error) + else: + loss.get().get() + + renew = if renew.isNone: + false + else: + if renew.get().isErr: + return RestApiResponse.error(Http500, $renew.get().error) + else: + renew.get().get() + + try: + without storageCid =? (await node.requestStorage( + cid, + ppb, + duration, + nodes, + loss, + renew)), error: + return RestApiResponse.error(Http500, error.msg) + + return RestApiResponse.response($storageCid) + except CatchableError as exc: + return RestApiResponse.error(Http500, exc.msg) + router.rawApi( MethodPost, "/api/dagger/v1/upload") do ( @@ -173,7 +280,7 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = try: while not reader.atEof: var - buff = newSeqUninitialized[byte](FileChunkSize) + buff = newSeqUninitialized[byte](BlockSize) len = await reader.readOnce(addr buff[0], buff.len) buff.setLen(len) @@ -185,8 +292,8 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = bytes += len await stream.pushEof() - without cid =? (await storeFut): - return RestApiResponse.error(Http500) + without cid =? (await storeFut), error: + return RestApiResponse.error(Http500, error.msg) trace "Uploaded file", bytes, cid = $cid return RestApiResponse.response($cid) diff --git a/tests/dagger/testnode.nim b/tests/dagger/testnode.nim index 22e8197c..72603965 100644 --- a/tests/dagger/testnode.nim +++ b/tests/dagger/testnode.nim @@ -42,7 +42,7 @@ suite "Test Node": localStore = CacheStore.new() engine = BlockExcEngine.new(localStore, wallet, network) store = NetworkStore.new(engine, localStore) - node = DaggerNodeRef.new(switch, store, engine) + node = DaggerNodeRef.new(switch, store, engine, nil) # TODO: pass `Erasure` await node.start()