Cleanup-manifest (#52)

* adding nim-blscurve

* adding taskpools

* cleanup manifest
This commit is contained in:
Dmitriy Ryajov 2022-03-14 10:06:36 -06:00 committed by GitHub
parent d8162b1e16
commit f414c695be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 290 additions and 239 deletions

13
.gitmodules vendored
View File

@ -162,3 +162,16 @@
[submodule "vendor/lrucache.nim"] [submodule "vendor/lrucache.nim"]
path = vendor/lrucache.nim path = vendor/lrucache.nim
url = https://github.com/status-im/lrucache.nim url = https://github.com/status-im/lrucache.nim
[submodule "vendor/nim-blscurve"]
path = vendor/nim-blscurve
url = https://github.com/status-im/nim-blscurve.git
ignore = untracked
branch = master
[submodule "vendor/nim-taskpools.git"]
ignore = untracked
branch = master
[submodule "vendor/nim-taskpools"]
path = vendor/nim-taskpools
url = https://github.com/status-im/nim-taskpools.git
ignore = untracked
branch = master

View File

@ -1,158 +0,0 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/tables
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import pkg/chronos
import ./manifest
import ./errors
export manifest
const
ManifestCodec* = multiCodec("dag-pb")
var
emptyDigests {.threadvar.}: array[CIDv0..CIDv1, Table[MultiCodec, MultiHash]]
once {.threadvar.}: bool
template EmptyDigests: untyped =
if not once:
emptyDigests = [
CIDv0: {
multiCodec("sha2-256"): Cid
.init("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
.get()
.mhash
.get()
}.toTable,
CIDv1: {
multiCodec("sha2-256"): Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
.get()
.mhash
.get()
}.toTable,
]
once = true
emptyDigests
type
BlocksManifest* = object
manifest: Manifest
version*: CidVersion
hcodec*: MultiCodec
codec*: MultiCodec
proc len*(b: BlocksManifest): int = b.manifest.blocks.len
iterator items*(b: BlocksManifest): Cid =
for b in b.manifest.blocks:
yield b
template hashBytes(mh: MultiHash): seq[byte] =
## get the hash bytes of a multihash object
##
mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)]
proc cid*(b: var BlocksManifest): ?!Cid =
## Generate a root hash using the treehash algorithm
##
if htree =? b.manifest.cid:
return htree.success
var
stack: seq[MultiHash]
for cid in b.manifest.blocks:
stack.add(? cid.mhash.mapFailure)
while stack.len > 1:
let
(b1, b2) = (stack.pop(), stack.pop())
mh = ? MultiHash.digest(
$b.hcodec,
(b1.hashBytes() & b2.hashBytes()))
.mapFailure
stack.add(mh)
if stack.len == 1:
let cid = ? Cid.init(
b.version,
b.codec,
(? EmptyDigests[b.version][b.hcodec].catch))
.mapFailure
b.manifest.cid = cid.some
return cid.success
proc put*(b: var BlocksManifest, cid: Cid) =
b.manifest.cid = Cid.none
trace "Adding cid to manifest", cid
b.manifest.blocks.add(cid)
proc contains*(b: BlocksManifest, cid: Cid): bool =
cid in b.manifest.blocks
proc encode*(b: var BlocksManifest): ?!seq[byte] =
if b.manifest.cid.isNone:
b.manifest.cid = (? b.cid).some
b.manifest.encode()
proc init*(
T: type BlocksManifest,
blocks: openArray[Cid] = [],
version = CIDv1,
hcodec = multiCodec("sha2-256"),
codec = multiCodec("raw")): ?!T =
## Create a manifest using array of `Cid`s
##
if hcodec notin EmptyDigests[version]:
return failure("Unsupported manifest hash codec!")
T(
manifest: Manifest(blocks: @blocks),
version: version,
codec: codec,
hcodec: hcodec,
).success
proc init*(
T: type BlocksManifest,
data: openArray[byte]): ?!T =
## Create manifest from a raw data blob
## (in dag-pb for for now)
##
let
manifest = ? Manifest.decode(data)
cid = !manifest.cid
mhash = ? cid.mhash.mapFailure
var blockManifest = ? BlocksManifest.init(
manifest.blocks,
cid.version,
mhash.mcodec,
cid.mcodec)
if cid != ? blockManifest.cid:
return failure("Decoded content hash doesn't match!")
blockManifest.success

View File

@ -1,65 +1,3 @@
## Nim-Dagger import ./manifest/manifest
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/libp2p/protobuf/minprotobuf export manifest
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ./errors
type
Manifest* = object
cid*: ?Cid
blocks*: seq[Cid]
proc encode*(b: var Manifest): ?!seq[byte] =
## Encode the manifest into a ``ManifestCodec``
## multicodec container (Dag-pb) for now
##
var pbNode = initProtoBuffer()
for c in b.blocks:
var pbLink = initProtoBuffer()
pbLink.write(1, c.data.buffer) # write Cid links
pbLink.finish()
pbNode.write(2, pbLink)
let cid = !b.cid
pbNode.write(1, cid.data.buffer) # set the treeHash Cid as the data field
pbNode.finish()
return pbNode.buffer.success
proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest =
## Decode a manifest from a data blob
##
var
pbNode = initProtoBuffer(data)
cidBuf: seq[byte]
blocks: seq[Cid]
if pbNode.getField(1, cidBuf).isErr:
return failure("Unable to decode Cid from manifest!")
let cid = ? Cid.init(cidBuf).mapFailure
var linksBuf: seq[seq[byte]]
if pbNode.getRepeatedField(2, linksBuf).isOk:
for pbLinkBuf in linksBuf:
var
blocksBuf: seq[seq[byte]]
blockBuf: seq[byte]
pbLink = initProtoBuffer(pbLinkBuf)
if pbLink.getField(1, blockBuf).isOk:
blocks.add(? Cid.init(blockBuf).mapFailure)
Manifest(cid: cid.some, blocks: blocks).success

View File

@ -0,0 +1,73 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/tables
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import pkg/chronos
import ./types
import ../errors
const
DagPBCodec* = multiCodec("dag-pb")
type
ManifestCoderType*[codec: static MultiCodec] = object
DagPBCoder* = ManifestCoderType[multiCodec("dag-pb")]
func encode*(_: DagPBCoder, b: Manifest): ?!seq[byte] =
## Encode the manifest into a ``ManifestCodec``
## multicodec container (Dag-pb) for now
##
var pbNode = initProtoBuffer()
for c in b.blocks:
var pbLink = initProtoBuffer()
pbLink.write(1, c.data.buffer) # write Cid links
pbLink.finish()
pbNode.write(2, pbLink)
let cid = !b.rootHash
pbNode.write(1, cid.data.buffer) # set the rootHash Cid as the data field
pbNode.finish()
return pbNode.buffer.success
func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest =
## Decode a manifest from a data blob
##
var
pbNode = initProtoBuffer(data)
cidBuf: seq[byte]
blocks: seq[Cid]
if pbNode.getField(1, cidBuf).isErr:
return failure("Unable to decode Cid from manifest!")
let cid = ? Cid.init(cidBuf).mapFailure
var linksBuf: seq[seq[byte]]
if pbNode.getRepeatedField(2, linksBuf).isOk:
for pbLinkBuf in linksBuf:
var
blocksBuf: seq[seq[byte]]
blockBuf: seq[byte]
pbLink = initProtoBuffer(pbLinkBuf)
if pbLink.getField(1, blockBuf).isOk:
blocks.add(? Cid.init(blockBuf).mapFailure)
Manifest(rootHash: cid.some, blocks: blocks).success

View File

@ -0,0 +1,134 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/tables
import pkg/libp2p/protobuf/minprotobuf
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import ./types
import ./coders
import ../errors
export coders, Manifest
const
ManifestContainers* = {
$DagPBCodec: DagPBCoder()
}.toTable
func len*(self: Manifest): int =
self.blocks.len
func `[]`*(self: Manifest, i: Natural): Cid =
self.blocks[i]
func `[]=`*(self: var Manifest, i: Natural, item: Cid) =
self.blocks[i] = item
func `[]`*(self: Manifest, i: BackwardsIndex): Cid =
self.blocks[self.len - i.int]
func `[]=`*(self: var Manifest, i: BackwardsIndex, item: Cid) =
self.blocks[self.len - i.int] = item
proc add*(self: var Manifest, cid: Cid) =
self.rootHash = Cid.none
trace "Adding cid to manifest", cid
self.blocks.add(cid)
iterator items*(self: Manifest): Cid =
for b in self.blocks:
yield b
func contains*(self: Manifest, cid: Cid): bool =
cid in self.blocks
template hashBytes(mh: MultiHash): seq[byte] =
## get the hash bytes of a multihash object
##
mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)]
proc makeRoot(self: var Manifest): ?!void =
var
stack: seq[MultiHash]
for cid in self:
stack.add(? cid.mhash.mapFailure)
while stack.len > 1:
let
(b1, b2) = (stack.pop(), stack.pop())
mh = ? MultiHash.digest(
$self.hcodec,
(b1.hashBytes() & b2.hashBytes()))
.mapFailure
stack.add(mh)
if stack.len == 1:
let cid = ? Cid.init(
self.version,
self.codec,
(? EmptyDigests[self.version][self.hcodec].catch))
.mapFailure
self.rootHash = cid.some
ok()
proc cid*(self: var Manifest): ?!Cid =
## Generate a root hash using the treehash algorithm
##
if self.rootHash.isNone:
? self.makeRoot()
(!self.rootHash).success
proc encode*(self: var Manifest, encoder = ManifestContainers[$DagPBCodec]): ?!seq[byte] =
## Encode a manifest using `encoder`
##
if self.rootHash.isNone:
? self.makeRoot()
encoder.encode(self)
func decode*(
_: type Manifest,
data: openArray[byte],
decoder = ManifestContainers[$DagPBCodec]): ?!Manifest =
## Decode a manifest using `decoder`
##
decoder.decode(data)
proc init*(
T: type Manifest,
blocks: openArray[Cid] = [],
version = CIDv1,
hcodec = multiCodec("sha2-256"),
codec = multiCodec("raw")): ?!T =
## Create a manifest using array of `Cid`s
##
if hcodec notin EmptyDigests[version]:
return failure("Unsupported manifest hash codec!")
T(
blocks: @blocks,
version: version,
codec: codec,
hcodec: hcodec,
).success

46
dagger/manifest/types.nim Normal file
View File

@ -0,0 +1,46 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/libp2p
import pkg/questionable
template EmptyDigests*: untyped =
var
emptyDigests {.global, threadvar.}:
array[CIDv0..CIDv1, Table[MultiCodec, MultiHash]]
once:
emptyDigests = [
CIDv0: {
multiCodec("sha2-256"): Cid
.init("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
.get()
.mhash
.get()
}.toTable,
CIDv1: {
multiCodec("sha2-256"): Cid
.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
.get()
.mhash
.get()
}.toTable,
]
emptyDigests
type
Manifest* = object of RootObj
rootHash*: ?Cid
blocks*: seq[Cid]
version*: CidVersion
hcodec*: MultiCodec
codec*: MultiCodec

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
import std/options import std/options
import std/tables
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
@ -21,7 +22,7 @@ import pkg/libp2p/signed_envelope
import ./chunker import ./chunker
import ./blocktype as bt import ./blocktype as bt
import ./blocksmanifest import ./manifest
import ./stores/blockstore import ./stores/blockstore
import ./blockexchange import ./blockexchange
@ -64,7 +65,7 @@ proc connect*(
proc streamBlocks*( proc streamBlocks*(
node: DaggerNodeRef, node: DaggerNodeRef,
stream: BufferStream, stream: BufferStream,
blockManifest: BlocksManifest) {.async.} = blockManifest: Manifest) {.async.} =
try: try:
# TODO: Read sequentially for now # TODO: Read sequentially for now
@ -96,10 +97,10 @@ proc retrieve*(
return failure( return failure(
newException(DaggerError, "Couldn't identify Cid!")) newException(DaggerError, "Couldn't identify Cid!"))
if mc == ManifestCodec: if $mc in ManifestContainers:
trace "Retrieving data set", cid, mc trace "Retrieving data set", cid, mc
without blockManifest =? BlocksManifest.init(blk.data): without blockManifest =? Manifest.decode(blk.data, ManifestContainers[$mc]):
return failure("Unable to construct manifest!") return failure("Unable to construct manifest!")
asyncSpawn node.streamBlocks(stream, blockManifest) asyncSpawn node.streamBlocks(stream, blockManifest)
@ -120,7 +121,7 @@ proc store*(
stream: LPStream): Future[?!Cid] {.async.} = stream: LPStream): Future[?!Cid] {.async.} =
trace "Storing data" trace "Storing data"
without var blockManifest =? BlocksManifest.init(): without var blockManifest =? Manifest.init():
return failure("Unable to create Block Set") return failure("Unable to create Block Set")
let let
@ -135,7 +136,7 @@ proc store*(
without blk =? bt.Block.init(chunk): without blk =? bt.Block.init(chunk):
return failure("Unable to init block from chunk!") return failure("Unable to init block from chunk!")
blockManifest.put(blk.cid) blockManifest.add(blk.cid)
if not (await node.blockStore.putBlock(blk)): if not (await node.blockStore.putBlock(blk)):
# trace "Unable to store block", cid = blk.cid # trace "Unable to store block", cid = blk.cid
return failure("Unable to store block " & $blk.cid) return failure("Unable to store block " & $blk.cid)
@ -153,7 +154,7 @@ proc store*(
newException(DaggerError, "Could not generate dataset manifest!")) newException(DaggerError, "Could not generate dataset manifest!"))
# Store as a dag-pb block # Store as a dag-pb block
without manifest =? bt.Block.init(data = data, codec = ManifestCodec): without manifest =? bt.Block.init(data = data, codec = DagPBCodec):
trace "Unable to init block from manifest data!" trace "Unable to init block from manifest data!"
return failure("Unable to init block from manifest data!") return failure("Unable to init block from manifest data!")

View File

@ -9,13 +9,13 @@ import pkg/stew/byteutils
import pkg/dagger/chunker import pkg/dagger/chunker
import pkg/dagger/blocktype as bt import pkg/dagger/blocktype as bt
import pkg/dagger/blocksmanifest import pkg/dagger/manifest
import ./helpers import ./helpers
suite "Manifest": suite "Manifest":
test "Should produce valid tree hash checksum": test "Should produce valid tree hash checksum":
without var manifest =? BlocksManifest.init( without var manifest =? Manifest.init(
blocks = @[ blocks = @[
Block.init("Block 1".toBytes).tryGet().cid, Block.init("Block 1".toBytes).tryGet().cid,
Block.init("Block 2".toBytes).tryGet().cid, Block.init("Block 2".toBytes).tryGet().cid,
@ -47,7 +47,7 @@ suite "Manifest":
) )
var var
blocksManifest = BlocksManifest.init(blocks).tryGet() blocksManifest = Manifest.init(blocks).tryGet()
let let
e = blocksManifest.encode().tryGet() e = blocksManifest.encode().tryGet()

View File

@ -3,6 +3,7 @@ import std/options
import pkg/asynctest import pkg/asynctest
import pkg/chronos import pkg/chronos
import pkg/chronicles
import pkg/stew/byteutils import pkg/stew/byteutils
import pkg/nitro import pkg/nitro
@ -12,7 +13,7 @@ import pkg/dagger/stores
import pkg/dagger/blockexchange import pkg/dagger/blockexchange
import pkg/dagger/chunker import pkg/dagger/chunker
import pkg/dagger/node import pkg/dagger/node
import pkg/dagger/blocksmanifest import pkg/dagger/manifest
import pkg/dagger/blocktype as bt import pkg/dagger/blocktype as bt
import ./helpers import ./helpers
@ -55,14 +56,14 @@ suite "Test Node":
storeFut = node.store(stream) storeFut = node.store(stream)
var var
manifest = BlocksManifest.init().tryGet() manifest = Manifest.init().tryGet()
try: try:
while ( while (
let chunk = await chunker.getBytes(); let chunk = await chunker.getBytes();
chunk.len > 0): chunk.len > 0):
await stream.pushData(chunk) await stream.pushData(chunk)
manifest.put(bt.Block.init(chunk).tryGet().cid) manifest.add(bt.Block.init(chunk).tryGet().cid)
finally: finally:
await stream.pushEof() await stream.pushEof()
await stream.close() await stream.close()
@ -75,7 +76,7 @@ suite "Test Node":
var var
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
localManifest = BlocksManifest.init(manifestBlock.data).tryGet() localManifest = Manifest.decode(manifestBlock.data).tryGet()
check: check:
manifest.len == localManifest.len manifest.len == localManifest.len
@ -83,7 +84,7 @@ suite "Test Node":
test "Retrieve Data Stream": test "Retrieve Data Stream":
var var
manifest = BlocksManifest.init().tryGet() manifest = Manifest.init().tryGet()
original: seq[byte] original: seq[byte]
while ( while (
@ -95,12 +96,13 @@ suite "Test Node":
original &= chunk original &= chunk
check await localStore.putBlock(blk) check await localStore.putBlock(blk)
manifest.put(blk.cid) manifest.add(blk.cid)
let let
manifestBlock = bt.Block.init( manifestBlock = bt.Block.init(
manifest.encode().tryGet(), manifest.encode().tryGet(),
codec = ManifestCodec).tryGet() codec = DagPBCodec)
.tryGet()
check await localStore.putBlock(manifestBlock) check await localStore.putBlock(manifestBlock)

1
vendor/nim-blscurve vendored Submodule

@ -0,0 +1 @@
Subproject commit 0237e4e0e914fc19359c18a66406d33bc942775c

1
vendor/nim-taskpools vendored Submodule

@ -0,0 +1 @@
Subproject commit 8d408ac6cfc9c24ec8b7b65d5993e85050dcbaa9