Block.init should return Result
This commit is contained in:
parent
fbe161a073
commit
ce15948b70
|
@ -131,7 +131,7 @@ proc broadcastWantList*(
|
|||
proc handleBlocks(
|
||||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
blocks: seq[auto]): Future[void] =
|
||||
blocks: seq[pb.Block]): Future[void] =
|
||||
## Handle incoming blocks
|
||||
##
|
||||
|
||||
|
@ -141,13 +141,14 @@ proc handleBlocks(
|
|||
trace "Handling blocks for peer", peer = peer.id
|
||||
|
||||
var blks: seq[bt.Block]
|
||||
for blk in blocks:
|
||||
when blk is pb.Block:
|
||||
blks.add(bt.Block.init(Cid.init(blk.prefix).get(), blk.data))
|
||||
elif blk is seq[byte]:
|
||||
blks.add(bt.Block.init(Cid.init(blk).get(), blk))
|
||||
else:
|
||||
error("Invalid block type")
|
||||
for blob in blocks:
|
||||
without cid =? Cid.init(blob.prefix):
|
||||
trace "Unable to initialize Cid from protobuf message"
|
||||
|
||||
without blk =? bt.Block.init(cid, blob.data, verify = true):
|
||||
trace "Unable to initialize Block from data"
|
||||
|
||||
blks.add(blk)
|
||||
|
||||
b.handlers.onBlocks(peer.id, blks)
|
||||
|
||||
|
|
|
@ -11,6 +11,10 @@
|
|||
|
||||
import pkg/libp2p
|
||||
import pkg/stew/byteutils
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./errors
|
||||
|
||||
type
|
||||
Block* = object of RootObj
|
||||
|
@ -25,18 +29,32 @@ func init*(
|
|||
T: type Block,
|
||||
data: openArray[byte] = [],
|
||||
version = CIDv1,
|
||||
hcodec = multiCodec("sha2-256"),
|
||||
codec = multiCodec("raw")): T =
|
||||
let hash = MultiHash.digest($hcodec, data).get()
|
||||
Block(
|
||||
cid: Cid.init(version, codec, hash).get(),
|
||||
mcodec = multiCodec("sha2-256"),
|
||||
codec = multiCodec("raw")): ?!T =
|
||||
|
||||
let
|
||||
hash = ? MultiHash.digest($mcodec, data).mapFailure
|
||||
cid = ? Cid.init(version, codec, hash).mapFailure
|
||||
|
||||
success Block(
|
||||
cid: cid,
|
||||
data: @data)
|
||||
|
||||
func init*(
|
||||
T: type Block,
|
||||
cid: Cid,
|
||||
data: openArray[byte] = [],
|
||||
verify: bool = false): T =
|
||||
Block(
|
||||
cid: cid,
|
||||
data: @data)
|
||||
data: openArray[byte],
|
||||
verify: bool = false): ?!T =
|
||||
|
||||
let
|
||||
mhash = ? cid.mhash.mapFailure
|
||||
b = ? Block.init(
|
||||
data = @data,
|
||||
version = cid.cidver,
|
||||
codec = cid.mcodec,
|
||||
mcodec = mhash.mcodec)
|
||||
|
||||
if verify and cid != b.cid:
|
||||
return failure "Cid's don't match!"
|
||||
|
||||
success b
|
||||
|
|
|
@ -133,8 +133,8 @@ proc store*(
|
|||
chunk.len > 0):
|
||||
|
||||
trace "Got data from stream", len = chunk.len
|
||||
let
|
||||
blk = bt.Block.init(chunk)
|
||||
without blk =? bt.Block.init(chunk):
|
||||
return failure("Unable to init block from chunk!")
|
||||
|
||||
blockManifest.put(blk.cid)
|
||||
if not (await node.blockStore.putBlock(blk)):
|
||||
|
@ -154,7 +154,10 @@ proc store*(
|
|||
newException(DaggerError, "Could not generate dataset manifest!"))
|
||||
|
||||
# Store as a dag-pb block
|
||||
let manifest = bt.Block.init(data = data, codec = ManifestCodec)
|
||||
without manifest =? bt.Block.init(data = data, codec = ManifestCodec):
|
||||
trace "Unable to init block from manifest data!"
|
||||
return failure("Unable to init block from manifest data!")
|
||||
|
||||
if not (await node.blockStore.putBlock(manifest)):
|
||||
trace "Unable to store manifest", cid = manifest.cid
|
||||
return failure("Unable to store manifest " & $manifest.cid)
|
||||
|
@ -165,8 +168,8 @@ proc store*(
|
|||
return failure(cid.error.msg)
|
||||
|
||||
trace "Stored data", manifestCid = manifest.cid,
|
||||
contentCid = !cid,
|
||||
blocks = blockManifest.len
|
||||
contentCid = !cid,
|
||||
blocks = blockManifest.len
|
||||
|
||||
return manifest.cid.success
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ method getBlock*(
|
|||
trace "Cannot read file from fs store", path , error
|
||||
return Block.failure("Cannot read file from fs store")
|
||||
|
||||
return Block.init(cid, data).success
|
||||
return Block.init(cid, data)
|
||||
|
||||
method putBlock*(
|
||||
self: FSStore,
|
||||
|
|
|
@ -41,14 +41,14 @@ suite "NetworkStore engine - 2 nodes":
|
|||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks1.add(bt.Block.init(chunk))
|
||||
blocks1.add(bt.Block.init(chunk).get())
|
||||
|
||||
while true:
|
||||
let chunk = await chunker2.getBytes()
|
||||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks2.add(bt.Block.init(chunk))
|
||||
blocks2.add(bt.Block.init(chunk).get())
|
||||
|
||||
switch1 = newStandardSwitch()
|
||||
switch2 = newStandardSwitch()
|
||||
|
@ -119,7 +119,7 @@ suite "NetworkStore engine - 2 nodes":
|
|||
check peerCtx2.account.?address == pricing2.address.some
|
||||
|
||||
test "should send want-have for block":
|
||||
let blk = bt.Block.init("Block 1".toBytes)
|
||||
let blk = bt.Block.init("Block 1".toBytes).get()
|
||||
check await blockexc2.engine.localStore.putBlock(blk)
|
||||
|
||||
let entry = Entry(
|
||||
|
@ -144,7 +144,7 @@ suite "NetworkStore engine - 2 nodes":
|
|||
check blocks.mapIt( !it.read ) == blocks2
|
||||
|
||||
test "remote should send blocks when available":
|
||||
let blk = bt.Block.init("Block 1".toBytes)
|
||||
let blk = bt.Block.init("Block 1".toBytes).get()
|
||||
|
||||
# should fail retrieving block from remote
|
||||
check not await blockexc1.getBlock(blk.cid)
|
||||
|
@ -183,7 +183,7 @@ suite "NetworkStore - multiple nodes":
|
|||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.init(chunk))
|
||||
blocks.add(bt.Block.init(chunk).get())
|
||||
|
||||
for e in generateNodes(5):
|
||||
switch.add(e.switch)
|
||||
|
|
|
@ -20,8 +20,8 @@ import ../examples
|
|||
suite "NetworkStore engine basic":
|
||||
let
|
||||
rng = Rng.instance()
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
seckey = PrivateKey.random(rng[]).get()
|
||||
peerId = PeerID.init(seckey.getPublicKey().get()).get()
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
|
||||
wallet = WalletRef.example
|
||||
|
||||
|
@ -35,7 +35,7 @@ suite "NetworkStore engine basic":
|
|||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.init(chunk))
|
||||
blocks.add(bt.Block.init(chunk).get())
|
||||
|
||||
done = newFuture[void]()
|
||||
|
||||
|
@ -87,8 +87,8 @@ suite "NetworkStore engine basic":
|
|||
suite "NetworkStore engine handlers":
|
||||
let
|
||||
rng = Rng.instance()
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
seckey = PrivateKey.random(rng[]).get()
|
||||
peerId = PeerID.init(seckey.getPublicKey().get()).get()
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
|
||||
wallet = WalletRef.example
|
||||
|
||||
|
@ -104,7 +104,7 @@ suite "NetworkStore engine handlers":
|
|||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.init(chunk))
|
||||
blocks.add(bt.Block.init(chunk).get())
|
||||
|
||||
done = newFuture[void]()
|
||||
engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork())
|
||||
|
@ -228,15 +228,15 @@ suite "Task Handler":
|
|||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.init(chunk))
|
||||
blocks.add(bt.Block.init(chunk).get())
|
||||
|
||||
done = newFuture[void]()
|
||||
engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork())
|
||||
peersCtx = @[]
|
||||
|
||||
for i in 0..3:
|
||||
let seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peers.add(PeerID.init(seckey.getPublicKey().tryGet()).tryGet())
|
||||
let seckey = PrivateKey.random(rng[]).get()
|
||||
peers.add(PeerID.init(seckey.getPublicKey().get()).get())
|
||||
|
||||
peersCtx.add(BlockExcPeerCtx(
|
||||
id: peers[i]
|
||||
|
@ -282,7 +282,7 @@ suite "Task Handler":
|
|||
|
||||
test "Should send presence":
|
||||
let present = blocks
|
||||
let missing = @[bt.Block.init("missing".toBytes)]
|
||||
let missing = @[bt.Block.init("missing".toBytes).get()]
|
||||
let price = (!engine.pricing).price
|
||||
|
||||
proc sendPresence(id: PeerID, presence: seq[BlockPresence]) =
|
||||
|
|
|
@ -20,8 +20,8 @@ import ../examples
|
|||
suite "NetworkStore network":
|
||||
let
|
||||
rng = Rng.instance()
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
seckey = PrivateKey.random(rng[]).get()
|
||||
peerId = PeerID.init(seckey.getPublicKey().get()).get()
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256)
|
||||
|
||||
var
|
||||
|
@ -40,7 +40,7 @@ suite "NetworkStore network":
|
|||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.init(chunk))
|
||||
blocks.add(bt.Block.init(chunk).get())
|
||||
|
||||
done = newFuture[void]()
|
||||
buffer = BufferStream.new()
|
||||
|
@ -156,7 +156,7 @@ suite "NetworkStore Network - e2e":
|
|||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.init(chunk))
|
||||
blocks.add(bt.Block.init(chunk).get())
|
||||
|
||||
done = newFuture[void]()
|
||||
switch1 = newStandardSwitch()
|
||||
|
|
|
@ -42,7 +42,7 @@ proc example*(_: type Pricing): Pricing =
|
|||
proc example*(_: type Block): Block =
|
||||
let length = rand(4096)
|
||||
let bytes = newSeqWith(length, rand(uint8))
|
||||
Block.init(bytes)
|
||||
Block.init(bytes).tryGet
|
||||
|
||||
proc example*(_: type PeerId): PeerID =
|
||||
let key = PrivateKey.random(Rng.instance[]).get
|
||||
|
|
|
@ -23,7 +23,7 @@ suite "FS Store":
|
|||
var
|
||||
store: FSStore
|
||||
repoDir: string
|
||||
newBlock = Block.init("New Block".toBytes())
|
||||
newBlock = Block.init("New Block".toBytes()).get()
|
||||
|
||||
setup:
|
||||
repoDir = path.parentDir / "repo"
|
||||
|
|
|
@ -15,7 +15,7 @@ import ../helpers
|
|||
suite "Memory Store tests":
|
||||
test "putBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes())
|
||||
newBlock = Block.init("New Block".toBytes()).get()
|
||||
store = MemoryStore.new()
|
||||
|
||||
check await store.putBlock(newBlock)
|
||||
|
@ -23,7 +23,7 @@ suite "Memory Store tests":
|
|||
|
||||
test "getBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes())
|
||||
newBlock = Block.init("New Block".toBytes()).get()
|
||||
store = MemoryStore.new(@[newBlock])
|
||||
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
|
@ -32,7 +32,7 @@ suite "Memory Store tests":
|
|||
|
||||
test "fail getBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes())
|
||||
newBlock = Block.init("New Block".toBytes()).get()
|
||||
store = MemoryStore.new(@[])
|
||||
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
|
@ -40,21 +40,21 @@ suite "Memory Store tests":
|
|||
|
||||
test "hasBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes())
|
||||
newBlock = Block.init("New Block".toBytes()).get()
|
||||
store = MemoryStore.new(@[newBlock])
|
||||
|
||||
check store.hasBlock(newBlock.cid)
|
||||
|
||||
test "fail hasBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes())
|
||||
newBlock = Block.init("New Block".toBytes()).get()
|
||||
store = MemoryStore.new(@[])
|
||||
|
||||
check not store.hasBlock(newBlock.cid)
|
||||
|
||||
test "delBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes())
|
||||
newBlock = Block.init("New Block".toBytes()).get()
|
||||
store = MemoryStore.new(@[newBlock])
|
||||
|
||||
check await store.delBlock(newBlock.cid)
|
||||
|
|
|
@ -17,13 +17,13 @@ suite "Manifest":
|
|||
test "Should produce valid tree hash checksum":
|
||||
without var manifest =? BlocksManifest.init(
|
||||
blocks = @[
|
||||
Block.init("Block 1".toBytes).cid,
|
||||
Block.init("Block 2".toBytes).cid,
|
||||
Block.init("Block 3".toBytes).cid,
|
||||
Block.init("Block 4".toBytes).cid,
|
||||
Block.init("Block 5".toBytes).cid,
|
||||
Block.init("Block 6".toBytes).cid,
|
||||
Block.init("Block 7".toBytes).cid,
|
||||
Block.init("Block 1".toBytes).get().cid,
|
||||
Block.init("Block 2".toBytes).get().cid,
|
||||
Block.init("Block 3".toBytes).get().cid,
|
||||
Block.init("Block 4".toBytes).get().cid,
|
||||
Block.init("Block 5".toBytes).get().cid,
|
||||
Block.init("Block 6".toBytes).get().cid,
|
||||
Block.init("Block 7".toBytes).get().cid,
|
||||
]):
|
||||
fail()
|
||||
|
||||
|
@ -42,7 +42,9 @@ suite "Manifest":
|
|||
|
||||
test "Should encode/decode to/from manifest":
|
||||
let
|
||||
blocks = (0..<1000).mapIt( Block.init(("Block " & $it).toBytes).cid )
|
||||
blocks = (0..<1000).mapIt(
|
||||
Block.init(("Block " & $it).toBytes).get().cid
|
||||
)
|
||||
|
||||
var
|
||||
manifest = BlocksManifest.init(blocks).get()
|
||||
|
|
|
@ -56,27 +56,27 @@ suite "Test Node":
|
|||
storeFut = node.store(stream)
|
||||
|
||||
var
|
||||
manifest = BlocksManifest.init().tryGet()
|
||||
manifest = BlocksManifest.init().get()
|
||||
|
||||
try:
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
await stream.pushData(chunk)
|
||||
manifest.put(bt.Block.init(chunk).cid)
|
||||
manifest.put(bt.Block.init(chunk).get().cid)
|
||||
finally:
|
||||
await stream.pushEof()
|
||||
await stream.close()
|
||||
|
||||
let
|
||||
manifestCid = (await storeFut).tryGet()
|
||||
manifestCid = (await storeFut).get()
|
||||
|
||||
check:
|
||||
manifestCid in localStore
|
||||
|
||||
var
|
||||
manifestBlock = (await localStore.getBlock(manifestCid)).get()
|
||||
localManifest = BlocksManifest.init(manifestBlock).tryGet()
|
||||
localManifest = BlocksManifest.init(manifestBlock).get()
|
||||
|
||||
check:
|
||||
manifest.len == localManifest.len
|
||||
|
@ -84,7 +84,7 @@ suite "Test Node":
|
|||
|
||||
test "Retrieve Data Stream":
|
||||
var
|
||||
manifest = BlocksManifest.init().tryGet()
|
||||
manifest = BlocksManifest.init().get()
|
||||
original: seq[byte]
|
||||
|
||||
while (
|
||||
|
@ -92,7 +92,7 @@ suite "Test Node":
|
|||
chunk.len > 0):
|
||||
|
||||
let
|
||||
blk = bt.Block.init(chunk)
|
||||
blk = bt.Block.init(chunk).get()
|
||||
|
||||
original &= chunk
|
||||
check await localStore.putBlock(blk)
|
||||
|
@ -100,8 +100,8 @@ suite "Test Node":
|
|||
|
||||
let
|
||||
manifestBlock = bt.Block.init(
|
||||
manifest.encode().tryGet(),
|
||||
codec = ManifestCodec)
|
||||
manifest.encode().get(),
|
||||
codec = ManifestCodec).get()
|
||||
|
||||
check await localStore.putBlock(manifestBlock)
|
||||
|
||||
|
@ -125,7 +125,7 @@ suite "Test Node":
|
|||
test "Retrieve One Block":
|
||||
let
|
||||
testString = "Block 1"
|
||||
blk = bt.Block.init(testString.toBytes)
|
||||
blk = bt.Block.init(testString.toBytes).get()
|
||||
|
||||
var
|
||||
stream = BufferStream.new()
|
||||
|
|
Loading…
Reference in New Issue