This commit is contained in:
Dmitriy Ryajov 2023-10-27 17:22:15 -06:00
parent 81a6eb13bc
commit 970fa74fb0
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
7 changed files with 364 additions and 83 deletions

View File

@ -1,28 +0,0 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import ./merklestore
import pkg/datastore
import ../../namespaces
type
DataStoreBackend* = ref object of MerkleStore
store*: Datastore
method put*(
self: DataStoreBackend,
index, level: Natural,
hash: seq[byte]): Future[?!void] {.async.} =
success await self.store.put(index, hash)
method get*(self: DataStoreBackend, index, level: Natural): Future[!?seq[byte]] =
raiseAssert("Not implemented!")
func new*(_: type DataStoreBackend, store: Datastore): DataStoreBackend =
DataStoreBackend(store: store)

View File

@ -0,0 +1,257 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
#[
message IndexEntry {
int64 index = 1;
int64 offset = 2;
}
message Header {
repeated IndexEntry index = 1;
}
]#
################################################################
##
## This implementation will store the nodes of a merkle tree
## in a file serialized with protobuf format
##
## There are several methods of constructing Merkle trees
##
## - Locally, where the root of the tree is unknown until
## all the leaves are processed
## - By retrieving it from a remote node, in which case, the
## leaf will be accompanied by a path all the way to the root,
## but it might arrive out of order.
##
## The requirements are thus:
##
## - Graceful and efficient handling of partial trees during
## construction, before the root is known
## - Efficient random and sequential access
## - Easily streamable for both construction and reads
##
## Constructing a tree from a stream of leaves:
##
## To construct a tree from a stream of leaves, we need to
## store the leaves in a file and keep track of their
## offsets. We address everything by their hash, but the root
## of the tree is unknown until all the leaves are processed,
## thus the tree is initially constructed in a temporary location.
## Once the root is known, the tree is "sealed".
##
## Sealing consists of:
##
## - Creating a new file in the final destination
## - Writting the header with the table of indices and offsets
## - Copying the contents of the temporary file to the new file
##
## Constructing a tree from a stream merkle paths
##
## Constructing the tree from a stream of merkle paths is similar
## to constructing it from a stream of leaves, except that the
## root of the tree is immediately known, so we can skip the temporary
## file and write directly to the final destination.
##
## No special sealing is reaquired, the file is stored under it's
## tree root.
##
import std/os
import std/tables
import pkg/chronos
import pkg/chronos/sendfile
import pkg/questionable
import pkg/questionable/results
import pkg/libp2p/varint
import pkg/libp2p/protobuf/minprotobuf
import ./merklestore
import ../../errors
type
FileStore* = ref object of MerkleStore
file: File
path*: string
offset: uint64
bytes: uint64
headerOffset: uint64
offsets: Table[uint64, uint64]
proc readVarint(file: File): ?!uint64 =
var
buffer: array[10, byte]
for i in 0..<buffer.len:
if file.readBytes(buffer, i, 1) != 1:
return failure "Cannot read varint"
var
varint: uint64
length: int
let res = PB.getUVarint(buffer.toOpenArray(0, i), length, varint)
if res.isOk():
return success varint
if res.error() != VarintError.Incomplete:
break
return failure "Cannot parse varint"
proc writeVarint(data: openArray[byte]): seq[byte] =
let vbytes = PB.toBytes(data.len().uint64)
var buf = newSeqUninitialized[byte](data.len() + vbytes.len)
buf[0..<vbytes.len] = vbytes.toOpenArray()
buf[vbytes.len..<buf.len] = data
proc readHeader(file: File): ?!(uint64, Table[uint64, uint64]) =
let
len = ? file.readVarint()
var
header = newSeqUninitialized[byte](len.Natural)
if file.readBytes(header, 0, len.Natural) != len.int:
return failure "Unable to read header"
var
offsets: Table[uint64, uint64]
pb = initProtoBuffer(header)
offsetsList: seq[seq[byte]]
if ? pb.getRepeatedField(1, offsetsList).mapFailure:
for item in offsetsList:
var
offsetsPb = initProtoBuffer(item)
index: uint64
offset: uint64
discard ? offsetsPb.getField(1, index).mapFailure
discard ? offsetsPb.getField(2, offset).mapFailure
offsets[index] = offset
success (len, offsets)
proc writeHeader(file: File, offsets: Table[uint64, uint64]): ?!void =
var
pb = initProtoBuffer()
for (index, offset) in offsets.pairs:
var
offsetsPb = initProtoBuffer()
index = index
offset = offset
offsetsPb.write(1, index)
offsetsPb.write(2, offset)
offsetsPb.finish()
pb.write(1, offsetsPb.buffer)
pb.finish()
var buf = pb.buffer.writeVarint()
if file.writeBytes(buf, 0, buf.len) != buf.len:
return failure "Cannot write header to store!"
success()
method put*(
self: FileStore,
index: Natural,
hash: seq[byte]): Future[?!void] {.async.} =
## Write a node to the on disk file
##
var
offset = self.offset
pb = initProtoBuffer()
pb.write(1, hash)
pb.finish()
self.offsets[index.uint64] = offset.uint64
var buf = pb.buffer.writeVarint()
self.offset += buf.len.uint64
# TODO: add async file io
if self.file.writeBytes(buf, 0, buf.len) != buf.len:
return failure "Cannot write node to store!"
self.bytes += buf.len.uint64
success()
method get*(self: FileStore, index: Natural): Future[?!seq[byte]] {.async.} =
## Read a node from the on disk file
##
let index = index.uint64
if index notin self.offsets:
return failure "Node doesn't exist in store!"
let
offset = self.offsets[index] + self.headerOffset
self.file.setFilePos(offset.int64)
without size =? self.file.readVarint(), err:
return failure err
var
buf = newSeqUninitialized[byte](size)
if self.file.readBytes(buf, 0, size) != size.int:
return failure "Cannot read node from store!"
success buf
method seal*(self: FileStore, id: string = ""): Future[?!void] {.async.} =
if id.len <= 0:
return success()
let path = self.path / id
if fileExists(path):
return failure "File already exists!"
let file = open(path, fmWrite)
if (let res = file.writeHeader(self.offsets); res.isErr):
return failure "Cannot copy file!"
var bytes = self.bytes.int
if sendfile(file.getFileHandle, self.file.getFileHandle, 0, bytes) != 0 or
bytes != self.bytes.int:
return failure "Cannot copy file!"
success()
proc new*(_: type FileStore, file: File, path: string): ?!FileStore =
## Construct a filestore merkle tree backing store
##
let path = ? (
block:
if path.isAbsolute: path
else: getCurrentDir() / path).catch
if not dirExists(path):
return failure "directory does not exist: " & path
file.setFilePos(0)
let
(len, offsets) = if file.getFileSize > 0:
? file.readHeader()
else:
(0, initTable[uint64, uint64]())
success FileStore(file: file, headerOffset: len, offsets: offsets)

View File

@ -7,11 +7,24 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/questionable/results
type
MerkleStore* = ref object of RootObj
method put*(self: MerkleStore, index, level: Natural, hash: seq[byte]): Future[?!void] =
method put*(self: MerkleStore, index: Natural, hash: seq[byte]): Future[?!void] {.base.} =
## Put the hash of the file at the given index and level.
##
raiseAssert("Not implemented!")
method get*(self: MerkleStore, index, level: Natural): Future[!?seq[byte]] =
method get*(self: MerkleStore, index: Natural): Future[?!seq[byte]] {.base.} =
## Get hash at index and level.
raiseAssert("Not implemented!")
method seal*(self: MerkleStore, id: string = ""): Future[?!void] {.base.} =
## Perform misc tasks required to finish persisting the merkle tree.
##
raiseAssert("Not implemented!")

View File

@ -0,0 +1,12 @@
syntax = "proto3";
package merklestore.message.pb;
message IndexEntry {
int64 index = 1;
int64 offset = 2;
}
message Header {
repeated IndexEntry index = 1;
}

View File

@ -0,0 +1,4 @@
import ./backends/merklestore
import ./backends/filestore
export merklestore, filestore

View File

@ -13,7 +13,10 @@ import std/sequtils
import std/sugar
import std/algorithm
import pkg/upraises
import pkg/chronos
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import pkg/nimcrypto/sha2
import pkg/libp2p/[cid, multicodec, multihash, vbuffer]
@ -22,17 +25,21 @@ import pkg/stew/byteutils
import ../errors
import ../utils
import ./merklestore
logScope:
topics = "codex merkletree"
type
MerkleTree* = ref object of RootObj
root*: ?MultiHash # the root hash of the tree
mcodec: MultiCodec # multicodec of the hash function
height: Natural # current height of the tree (levels - 1)
levels: Natural # number of levels in the tree (height + 1)
leafs: Natural # total number of leafs, if odd the last leaf will be hashed twice
length: Natural # corrected to even number of leafs in the tree
size: Natural # total number of nodes in the tree (corrected for odd leafs)
store: MerkleStore # store for the tree
leafsIter: AsyncIter[seq[byte]] # leafs iterator of the tree
MerkleProof* = object
@ -44,54 +51,43 @@ type
# MerkleTree
###########################################################
proc root*(self: MerkleTree): ?!MultiHash =
if self.nodes.len == 0 or self.nodes[^1].len == 0:
return failure("Tree hasn't been build")
MultiHash.init(self.mcodec, self.nodes[^1]).mapFailure
proc build*(self: var MerkleTree): Future[?!void] {.async.} =
## Builds a tree from previously added data blocks
##
## Tree built from data blocks A, B and C is
## H5=H(H3 & H4)
## / \
## H3=H(H0 & H1) H4=H(H2 & 0x00)
## / \ /
## H0=H(A) H1=H(B) H2=H(C)
## | | |
## A B C
##
## Memory layout is [H0, H1, H2, H3, H4, H5]
proc build*(self: MerkleTree): Future[?!void] {.async.} =
## Builds a tree from leafs
##
var
length = if bool(self.leafs and 1):
self.nodes[self.leafs] = self.nodes[self.leafs - 1] # even out the tree
self.leafs + 1
else:
self.leafs
var length = self.length
while length > 1:
for i in 0..<length:
let
left = self.nodes[i * 2]
right = self.nodes[i * 2 + 1]
hash = ? MultiHash.digest($self.mcodec, left & right).mapFailure
for i in 0..<length div 2:
echo i
if self.leafsIter.finished:
return failure("Not enough leafs")
self.nodes[length + i] = hash.bytes
let
left = await self.leafsIter.next()
right = await self.leafsIter.next()
without hash =?
MultiHash.digest($self.mcodec, left & right).mapFailure, err:
return failure(err)
let index = self.length + length + i
(await self.store.put(index, hash.bytes)).tryGet
length = length shr 1
without root =? (await self.store.get(self.size)) and
rootHash =? MultiHash.digest($self.mcodec, root).mapFailure, err:
return failure "Unable to get tree root"
self.root = rootHash.some
return success()
func getProofs(self: MerkleTree, indexes: openArray[Natural]): ?!seq[MerkleProof] =
proc getProofs(
self: MerkleTree,
indexes: openArray[Natural]): Future[?!seq[MerkleProof]] {.async.} =
## Returns a proof for the given index
##
if self.nodes.len == 0 or self.nodes[^1].len == 0:
return failure("Tree hasn't been build")
var
proofs = newSeq[MerkleProof]()
@ -100,15 +96,23 @@ func getProofs(self: MerkleTree, indexes: openArray[Natural]): ?!seq[MerkleProof
index = idx
nodes: seq[seq[byte]]
nodes.add(self.nodes[idx])
without node =? (await self.store.get(index)):
return failure "Unable to get node"
nodes.add(node)
for level in 1..<self.levels:
debugEcho level
nodes.add(
if bool(index and 1):
self.nodes[level + 1]
let
idx = if bool(index and 1):
level + 1
else:
self.nodes[level - 1])
level - 1
without node =? (await self.store.get(idx)), err:
return failure "Unable to get node"
nodes.add(node)
index = index shr 1
proofs.add(
@ -119,8 +123,9 @@ func getProofs(self: MerkleTree, indexes: openArray[Natural]): ?!seq[MerkleProof
return success proofs
func init*(
func new*(
T: type MerkleTree,
store: MerkleStore,
leafs: Natural,
leafsIter: AsyncIter[seq[byte]],
mcodec: MultiCodec = multiCodec("sha2-256")): ?!MerkleTree =
@ -133,32 +138,50 @@ func init*(
size = 2 * length
height = log2(maxWidth.float).Natural
self = MerkleTree(
store: store,
mcodec: mcodec,
leafs: leafs,
length: length,
size: size,
height: height,
levels: height - 1,
nodesIter: leafsIter)
leafsIter: leafsIter)
success self
when isMainModule:
import std/os
import std/sequtils
import pkg/chronos
import pkg/stew/byteutils
import pkg/questionable
import pkg/questionable/results
var
leafs = [
"A", "B", "C", "D", "E", "F",
"G", "H", "I", "J", "K", "L",
"M", "N", "O", "P", "Q"]
.mapIt( MultiHash.digest("sha2-256", it.toBytes).tryGet().data.buffer )
tree = MerkleTree.init(leafs).tryGet()
proc main() {.async.} =
var
leafs = [
"A", "B", "C", "D", "E", "F",
"G", "H", "I", "J", "K", "L",
"M", "N", "O", "P"]
.mapIt(
MultiHash.digest("sha2-256", it.toBytes).tryGet().data.buffer
)
tree.buildSync().tryGet
echo tree.root().tryGet()
let
file = open("tmp.merkle" , fmReadWrite)
store = FileStore.new(file, os.getCurrentDir()).tryGet()
tree = MerkleTree.new(
store = store,
leafs.len,
Iter.fromSlice(0..<leafs.len)
.map(
proc(i: int): Future[seq[byte]] {.async.} =
leafs[i])).tryGet()
echo tree.getProofs(@[0.Natural, 1, 2, 3, 4, 5]).tryGet
(await tree.build()).tryGet
# echo tree.root.get()
# echo (await tree.getProofs(@[0.Natural, 1, 2, 3, 4, 5])).tryGet
waitFor main()