From 6ce7e237677ff6dedbbdc708702889195c572c16 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 25 May 2022 20:29:31 -0600 Subject: [PATCH] Upload authenticators (#108) * initial implementation of storage proofs upload * make sure proof verifies with after deserializing * add por store * rename por store to stp store * rename porstore to stpstore * add support for host discovery to discovery mock * add tags upload network tests --- codex/blockexchange/engine/discovery.nim | 4 +- codex/blockexchange/network/networkpeer.nim | 2 +- codex/discovery.nim | 70 ++++++++-- codex/storageproofs.nim | 4 +- .../storageproofs/por/serialization/por.proto | 10 +- codex/storageproofs/storageproofs.nim | 98 ++++++++++++++ codex/storageproofs/stp.proto | 15 ++ codex/storageproofs/stpnetwork.nim | 104 ++++++++++++++ codex/storageproofs/stpproto.nim | 7 + codex/storageproofs/stpstore.nim | 65 +++++++-- codex/utils.nim | 2 +- .../blockexchange/discovery/testdiscovery.nim | 20 +-- .../discovery/testdiscoveryengine.nim | 6 +- tests/codex/helpers.nim | 3 +- tests/codex/helpers/mockdiscovery.nim | 39 +++--- tests/codex/storageproofs/testnetwork.nim | 128 ++++++++++++++++++ tests/codex/storageproofs/testpor.nim | 3 +- tests/codex/storageproofs/teststpstore.nim | 22 ++- tests/codex/teststorageproofs.nim | 1 + 19 files changed, 541 insertions(+), 62 deletions(-) create mode 100644 codex/storageproofs/storageproofs.nim create mode 100644 codex/storageproofs/stp.proto create mode 100644 codex/storageproofs/stpnetwork.nim create mode 100644 codex/storageproofs/stpproto.nim create mode 100644 tests/codex/storageproofs/testnetwork.nim diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 107084d8..dacfb213 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -105,7 +105,7 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = try: trace "Advertising block", cid = $cid - let request = b.discovery.provideBlock(cid) + let request = b.discovery.provide(cid) b.inFlightAdvReqs[cid] = request await request finally: @@ -137,7 +137,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = try: let request = b.discovery - .findBlockProviders(cid) + .find(cid) .wait(DefaultDiscoveryTimeout) b.inFlightDiscReqs[cid] = request diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 1a213383..9f648e37 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -18,7 +18,7 @@ logScope: topics = "codex blockexc networkpeer" const - MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big + MaxMessageSize = 100 * 1 shl 20 # manifest files can be big type RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} diff --git a/codex/discovery.nim b/codex/discovery.nim index 5ead151a..98089383 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -7,12 +7,17 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/algorithm + import pkg/chronos import pkg/chronicles import pkg/libp2p +import pkg/libp2p/routing_record +import pkg/libp2p/signed_envelope import pkg/questionable import pkg/questionable/results import pkg/stew/shims/net +import pkg/contractabi/address as ca import pkg/libp2pdht/discv5/protocol as discv5 import ./rng @@ -20,6 +25,10 @@ import ./errors export discv5 +# TODO: If generics in methods had not been +# deprecated, this could have been implemented +# much more elegantly. + type Discovery* = ref object of RootObj protocol: discv5.Protocol @@ -42,21 +51,31 @@ proc new*( ), localInfo: localInfo) +proc toNodeId*(cid: Cid): NodeId = + ## Cid to discovery id + ## + + readUintBE[256](keccak256.digest(cid.data.buffer).data) + +proc toNodeId*(host: ca.Address): NodeId = + ## Eth address to discovery id + ## + + readUintBE[256](keccak256.digest(host.toArray).data) + proc findPeer*( d: Discovery, peerId: PeerID): Future[?PeerRecord] {.async.} = - let node = await d.protocol.resolve(toNodeId(peerId)) + let + node = await d.protocol.resolve(toNodeId(peerId)) + return if node.isSome(): some(node.get().record.data) else: none(PeerRecord) -proc toDiscoveryId*(cid: Cid): NodeId = - ## To discovery id - readUintBE[256](keccak256.digest(cid.data.buffer).data) - -method findBlockProviders*( +method find*( d: Discovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = ## Find block providers @@ -64,19 +83,19 @@ method findBlockProviders*( trace "Finding providers for block", cid = $cid without providers =? - (await d.protocol.getProviders(cid.toDiscoveryId())).mapFailure, error: + (await d.protocol.getProviders(cid.toNodeId())).mapFailure, error: trace "Error finding providers for block", cid = $cid, error = error.msg return providers -method provideBlock*(d: Discovery, cid: Cid) {.async, base.} = +method provide*(d: Discovery, cid: Cid) {.async, base.} = ## Provide a bock Cid ## trace "Providing block", cid = $cid let nodes = await d.protocol.addProvider( - cid.toDiscoveryId(), + cid.toNodeId(), d.localInfo.signedPeerRecord) if nodes.len <= 0: @@ -84,6 +103,39 @@ method provideBlock*(d: Discovery, cid: Cid) {.async, base.} = trace "Provided to nodes", nodes = nodes.len +method find*( + d: Discovery, + host: ca.Address): Future[seq[SignedPeerRecord]] {.async, base.} = + ## Find host providers + ## + + trace "Finding providers for host", host = $host + without var providers =? + (await d.protocol.getProviders(host.toNodeId())).mapFailure, error: + trace "Error finding providers for host", host = $host, exc = error.msg + return + + if providers.len <= 0: + trace "No providers found", host = $host + return + + providers.sort do(a, b: SignedPeerRecord) -> int: + system.cmp[uint64](a.data.seqNo, b.data.seqNo) + + return providers + +method provide*(d: Discovery, host: ca.Address) {.async, base.} = + ## Provide hosts + ## + + trace "Providing host", host = $host + let + nodes = await d.protocol.addProvider( + host.toNodeId(), + d.localInfo.signedPeerRecord) + if nodes.len > 0: + trace "Provided to nodes", nodes = nodes.len + proc start*(d: Discovery) {.async.} = d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR") d.protocol.open() diff --git a/codex/storageproofs.nim b/codex/storageproofs.nim index 7ad106ea..ef0a8f00 100644 --- a/codex/storageproofs.nim +++ b/codex/storageproofs.nim @@ -1,5 +1,7 @@ import ./storageproofs/por import ./storageproofs/timing import ./storageproofs/stpstore +import ./storageproofs/stpnetwork +import ./storageproofs/stpproto -export por, timing, stpstore +export por, timing, stpstore, stpnetwork, stpproto diff --git a/codex/storageproofs/por/serialization/por.proto b/codex/storageproofs/por/serialization/por.proto index 76ee1f0c..82670e18 100644 --- a/codex/storageproofs/por/serialization/por.proto +++ b/codex/storageproofs/por/serialization/por.proto @@ -12,11 +12,6 @@ message PoREnvelope { bytes signature = 2; } - message ProofMessage { - repeated bytes mu = 1; - bytes sigma = 2; - } - message PubKeyMessage { bytes signkey = 1; bytes key = 2; @@ -28,6 +23,11 @@ message PoREnvelope { repeated bytes authenticators = 3; } + message ProofMessage { + repeated bytes mu = 1; + bytes sigma = 2; + } + PorMessage por = 1; ProofMessage proof = 2; } diff --git a/codex/storageproofs/storageproofs.nim b/codex/storageproofs/storageproofs.nim new file mode 100644 index 00000000..eb9ae6cf --- /dev/null +++ b/codex/storageproofs/storageproofs.nim @@ -0,0 +1,98 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results +import pkg/contractabi/address as ca + +import ../stores +import ../manifest +import ../streams +import ../utils + +import ./por +import ./stpnetwork +import ./stpproto +import ./stpstore +import ./timing + +export stpnetwork, stpstore, por, timing, stpproto + +type + StorageProofs* = object + store*: BlockStore + network*: StpNetwork + stpStore*: StpStore + +proc upload*( + self: StorageProofs, + cid: Cid, + indexes: seq[int], + host: ca.Address): Future[?!void] {.async.} = + ## Upload authenticators + ## + + without por =? (await self.stpStore.retrieve(cid)): + trace "Unable to retrieve por data from store", cid + return failure("Unable to retrieve por data from store") + + return await self.network.uploadTags( + cid, + indexes, + por.authenticators, + host) + +# proc proof*() = +# discard + +# proc verify*() = +# discard + +proc setupProofs*( + self: StorageProofs, + manifest: Manifest): Future[?!void] {.async.} = + ## Setup storage authentication + ## + + without cid =? manifest.cid: + return failure("Unable to retrieve Cid from manifest!") + + let + (spk, ssk) = keyGen() + por = await PoR.init( + StoreStream.new(self.store, manifest), + ssk, + spk, + manifest.blockSize) + + return await self.stpStore.store(por.toMessage(), cid) + +proc init*( + T: type StorageProofs, + network: StpNetwork, + store: BlockStore, + stpStore: StpStore): StorageProofs = + + var + self = T( + store: store, + stpStore: stpStore, + network: network) + + proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} = + try: + await self.stpStore.store(msg.cid, msg.tags).tryGet() + trace "Stored tags", cid = $msg.cid, tags = msg.tags.len + except CatchableError as exc: + trace "Exception attempting to store tags", exc = exc.msg + + self.network.tagsHandler = tagsHandler + self diff --git a/codex/storageproofs/stp.proto b/codex/storageproofs/stp.proto new file mode 100644 index 00000000..800e9035 --- /dev/null +++ b/codex/storageproofs/stp.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message StorageProofsMessage { + message Tag { + int64 idx = 1; + bytes tag = 2; + } + + message TagsMessage { + bytes cid = 1; + repeated Tag tags = 2; + } + + TagsMessage tagsMsg = 1; +} diff --git a/codex/storageproofs/stpnetwork.nim b/codex/storageproofs/stpnetwork.nim new file mode 100644 index 00000000..afafb7b5 --- /dev/null +++ b/codex/storageproofs/stpnetwork.nim @@ -0,0 +1,104 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils + +import pkg/chronos +import pkg/libp2p +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results +import pkg/contractabi/address as ca +import pkg/protobuf_serialization + +import ./stpproto +import ../discovery + +const + Codec* = "/dagger/storageproofs/1.0.0" + MaxMessageSize* = 1 shl 22 # 4MB + +logScope: + topics = "dagger storageproofs network" + +type + TagsHandler* = proc(msg: TagsMessage): + Future[void] {.raises: [Defect], gcsafe.} + + StpNetwork* = ref object of LPProtocol + switch*: Switch + discovery*: Discovery + tagsHandle*: TagsHandler + +proc uploadTags*( + self: StpNetwork, + cid: Cid, + indexes: seq[int], + tags: seq[seq[byte]], + host: ca.Address): Future[?!void] {.async.} = + # Upload tags to `host` + # + + var msg = TagsMessage(cid: cid.data.buffer) + for i in indexes: + msg.tags.add(Tag(idx: i, tag: tags[i])) + + let + peers = await self.discovery.find(host) + connFut = await one(peers.mapIt( + self.switch.dial( + it.data.peerId, + it.data.addresses.mapIt( it.address ), + @[Codec]))) + conn = await connFut + + try: + await conn.writeLp( + Protobuf.encode(StorageProofsMessage(tagsMsg: msg))) + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "Exception submitting tags", cid, exc = exc.msg + return failure(exc.msg) + finally: + await conn.close() + + return success() + +method init*(self: StpNetwork) = + ## Perform protocol initialization + ## + + proc handle(conn: Connection, proto: string) {.async, gcsafe.} = + try: + let + msg = await conn.readLp(MaxMessageSize) + message = Protobuf.decode(msg, StorageProofsMessage) + + if message.tagsMsg.tags.len > 0 and not self.tagsHandle.isNil: + await self.tagsHandle(message.tagsMsg) + except CatchableError as exc: + trace "Exception handling Storage Proofs message", exc = exc.msg + finally: + await conn.close() + + self.handler = handle + self.codec = Codec + +proc new*( + T: type StpNetwork, + switch: Switch, + discovery: Discovery): StpNetwork = + let + self = StpNetwork( + switch: switch, + discovery: discovery) + + self.init() + self diff --git a/codex/storageproofs/stpproto.nim b/codex/storageproofs/stpproto.nim new file mode 100644 index 00000000..39303099 --- /dev/null +++ b/codex/storageproofs/stpproto.nim @@ -0,0 +1,7 @@ +import pkg/protobuf_serialization + +import_proto3 "stp.proto" + +export StorageProofsMessage +export TagsMessage +export Tag diff --git a/codex/storageproofs/stpstore.nim b/codex/storageproofs/stpstore.nim index 43a8ca14..e64cd107 100644 --- a/codex/storageproofs/stpstore.nim +++ b/codex/storageproofs/stpstore.nim @@ -18,6 +18,7 @@ import pkg/questionable import pkg/questionable/results import pkg/protobuf_serialization +import ./stpproto import ./por type @@ -28,40 +29,86 @@ type template stpPath*(self: StpStore, cid: Cid): string = self.authDir / ($cid)[^self.postfixLen..^1] / $cid -proc retrieve*(self: StpStore, cid: Cid): Future[?!PorMessage] {.async.} = +proc retrieve*( + self: StpStore, + cid: Cid): Future[?!PorMessage] {.async.} = ## Retrieve authenticators from data store ## - let path = self.stpPath(cid) + let path = self.stpPath(cid) / "por" var data: seq[byte] if ( let res = io2.readFile(path, data); res.isErr): let error = io2.ioErrorMsg(res.error) - trace "Cannot retrieve authenticators from fs", path , error - return failure("Cannot retrieve authenticators from fs") + trace "Cannot retrieve storage proof data from fs", path , error + return failure("Cannot retrieve storage proof data from fs") return Protobuf.decode(data, PorMessage).success -proc store*(self: StpStore, por: PoR, cid: Cid): Future[?!void] {.async.} = +proc store*( + self: StpStore, + por: PorMessage, + cid: Cid): Future[?!void] {.async.} = ## Persist storage proofs ## let - dir = self.stpPath(cid).parentDir + dir = self.stpPath(cid) if io2.createPath(dir).isErr: trace "Unable to create storage proofs prefix dir", dir return failure(&"Unable to create storage proofs prefix dir ${dir}") - let path = self.stpPath(cid) + let path = dir / "por" if ( - let res = io2.writeFile(path, Protobuf.encode(por.toMessage())); + let res = io2.writeFile(path, Protobuf.encode(por)); res.isErr): let error = io2.ioErrorMsg(res.error) trace "Unable to store storage proofs", path, cid = cid, error return failure( - &"Unable to store storage proofs path = ${path} cid = ${$cid} error = ${error}") + &"Unable to store storage proofs - path = ${path} cid = ${$cid} error = ${error}") + + return success() + +proc retrieve*( + self: StpStore, + cid: Cid, + blocks: seq[int]): Future[?!seq[Tag]] {.async.} = + var tags: seq[Tag] + for b in blocks: + var tag = Tag(idx: b) + let path = self.stpPath(cid) / $b + if ( + let res = io2.readFile(path, tag.tag); + res.isErr): + let error = io2.ioErrorMsg(res.error) + trace "Cannot retrieve tags from fs", path , error + return failure("Cannot retrieve tags from fs") + tags.add(tag) + + return tags.success + +proc store*( + self: StpStore, + tags: seq[Tag], + cid: Cid): Future[?!void] {.async.} = + let + dir = self.stpPath(cid) + + if io2.createPath(dir).isErr: + trace "Unable to create storage proofs prefix dir", dir + return failure(&"Unable to create storage proofs prefix dir ${dir}") + + for t in tags: + let path = dir / $t.idx + if ( + let res = io2.writeFile(path, t.tag); + res.isErr): + let error = io2.ioErrorMsg(res.error) + trace "Unable to store tags", path, cid = cid, error + return failure( + &"Unable to store tags - path = ${path} cid = ${$cid} error = ${error}") return success() diff --git a/codex/utils.nim b/codex/utils.nim index 2f481e62..8d561d40 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -1,4 +1,4 @@ import ./utils/asyncheapqueue import ./utils/fileutils -export asyncheapqueue, fileutils \ No newline at end of file +export asyncheapqueue, fileutils diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 37a8a271..e71b78b3 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -43,7 +43,7 @@ suite "Block Advertising and Discovery": blocks.add(bt.Block.new(chunk).tryGet()) switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) - blockDiscovery = MockDiscovery.new(switch.peerInfo, 0.Port) + blockDiscovery = MockDiscovery.new() wallet = WalletRef.example network = BlockExcNetwork.new(switch) localStore = CacheStore.new(blocks.mapIt( it )) @@ -76,7 +76,7 @@ suite "Block Advertising and Discovery": await engine.start() - blockDiscovery.publishProvideHandler = + blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async, gcsafe.} = return @@ -94,7 +94,7 @@ suite "Block Advertising and Discovery": advertised = initTable.collect: for b in blocks: {b.cid: newFuture[void]()} - blockDiscovery.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = + blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = if cid in advertised and not advertised[cid].finished(): advertised[cid].complete() @@ -150,7 +150,7 @@ suite "E2E - Multiple Nodes Discovery": for _ in 0..<4: let s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) - blockDiscovery = MockDiscovery.new(s.peerInfo, 0.Port) + blockDiscovery = MockDiscovery.new() wallet = WalletRef.example network = BlockExcNetwork.new(s) localStore = CacheStore.new() @@ -189,15 +189,15 @@ suite "E2E - Multiple Nodes Discovery": var advertised: Table[Cid, SignedPeerRecord] MockDiscovery(blockexc[1].engine.discovery.discovery) - .publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised.add(cid, switch[1].peerInfo.signedPeerRecord) MockDiscovery(blockexc[2].engine.discovery.discovery) - .publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised.add(cid, switch[2].peerInfo.signedPeerRecord) MockDiscovery(blockexc[3].engine.discovery.discovery) - .publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised.add(cid, switch[3].peerInfo.signedPeerRecord) await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) @@ -231,15 +231,15 @@ suite "E2E - Multiple Nodes Discovery": var advertised: Table[Cid, SignedPeerRecord] MockDiscovery(blockexc[1].engine.discovery.discovery) - .publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[1].peerInfo.signedPeerRecord MockDiscovery(blockexc[2].engine.discovery.discovery) - .publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[2].peerInfo.signedPeerRecord MockDiscovery(blockexc[3].engine.discovery.discovery) - .publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index e6dde554..f8fd316f 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -79,7 +79,7 @@ suite "Test Discovery Engine": for b in blocks: { b.cid: newFuture[void]() } - blockDiscovery.publishProvideHandler = + blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = if not haves[cid].finished: haves[cid].complete @@ -124,7 +124,7 @@ suite "Test Discovery Engine": discoveryLoopSleep = 100.millis) have = newFuture[void]() - blockDiscovery.publishProvideHandler = + blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = check cid == blocks[0].cid if not have.finished: @@ -216,7 +216,7 @@ suite "Test Discovery Engine": reqs = newFuture[void]() count = 0 - blockDiscovery.publishProvideHandler = + blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = check cid == blocks[0].cid if count > 0: diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index c9931608..9f57fd9c 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -8,8 +8,9 @@ import pkg/codex/rng import ./helpers/nodeutils import ./helpers/randomchunker +import ./helpers/mockdiscovery -export randomchunker, nodeutils +export randomchunker, nodeutils, mockdiscovery # NOTE: The meaning of equality for blocks # is changed here, because blocks are now `ref` diff --git a/tests/codex/helpers/mockdiscovery.nim b/tests/codex/helpers/mockdiscovery.nim index 51192427..86e517a6 100644 --- a/tests/codex/helpers/mockdiscovery.nim +++ b/tests/codex/helpers/mockdiscovery.nim @@ -13,21 +13,20 @@ import pkg/questionable import pkg/questionable/results import pkg/stew/shims/net import pkg/codex/discovery +import pkg/contractabi/address as ca type MockDiscovery* = ref object of Discovery findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.gcsafe.} - publishProvideHandler*: proc(d: MockDiscovery, cid: Cid): + publishBlockProvideHandler*: proc(d: MockDiscovery, cid: Cid): + Future[void] {.gcsafe.} + findHostProvidersHandler*: proc(d: MockDiscovery, host: ca.Address): + Future[seq[SignedPeerRecord]] {.gcsafe.} + publishHostProvideHandler*: proc(d: MockDiscovery, host: ca.Address): Future[void] {.gcsafe.} -proc new*( - T: type MockDiscovery, - localInfo: PeerInfo, - discoveryPort: Port, - bootstrapNodes = newSeq[SignedPeerRecord](), - ): T = - +proc new*(T: type MockDiscovery): T = T() proc findPeer*( @@ -35,7 +34,7 @@ proc findPeer*( peerId: PeerID): Future[?PeerRecord] {.async.} = return none(PeerRecord) -method findBlockProviders*( +method find*( d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = if isNil(d.findBlockProvidersHandler): @@ -43,14 +42,22 @@ method findBlockProviders*( return await d.findBlockProvidersHandler(d, cid) -method provideBlock*(d: MockDiscovery, cid: Cid): Future[void] {.async.} = - if isNil(d.publishProvideHandler): +method provide*(d: MockDiscovery, cid: Cid): Future[void] {.async.} = + if isNil(d.publishBlockProvideHandler): return - await d.publishProvideHandler(d, cid) + await d.publishBlockProvideHandler(d, cid) -proc start*(d: Discovery) {.async.} = - discard +method find*( + d: MockDiscovery, + host: ca.Address): Future[seq[SignedPeerRecord]] {.async.} = + if isNil(d.findHostProvidersHandler): + return -proc stop*(d: Discovery) {.async.} = - discard + return await d.findHostProvidersHandler(d, host) + +method provide*(d: MockDiscovery, host: ca.Address): Future[void] {.async.} = + if isNil(d.publishHostProvideHandler): + return + + await d.publishHostProvideHandler(d, host) diff --git a/tests/codex/storageproofs/testnetwork.nim b/tests/codex/storageproofs/testnetwork.nim new file mode 100644 index 00000000..10357ba1 --- /dev/null +++ b/tests/codex/storageproofs/testnetwork.nim @@ -0,0 +1,128 @@ +import std/os +import std/sequtils + +import pkg/asynctest +import pkg/chronos +import pkg/libp2p +import pkg/libp2p/errors +import pkg/protobuf_serialization +import pkg/contractabi as ca + +import pkg/codex/rng +import pkg/codex/chunker +import pkg/codex/storageproofs +import pkg/codex/discovery +import pkg/codex/manifest +import pkg/codex/stores +import pkg/codex/storageproofs as st +import pkg/codex/blocktype as bt +import pkg/codex/streams + +import ../examples +import ../helpers + +const + SectorSize = 31 + SectorsPerBlock = BlockSize div SectorSize + DataSetSize = BlockSize * 100 + +suite "Storage Proofs Network": + let + rng = Rng.instance() + seckey1 = PrivateKey.random(rng[]).tryGet() + seckey2 = PrivateKey.random(rng[]).tryGet() + hostAddr1 = ca.Address.example + hostAddr2 = ca.Address.example + blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random + + var + stpNetwork1: StpNetwork + stpNetwork2: StpNetwork + switch1: Switch + switch2: Switch + discovery1: MockDiscovery + discovery2: MockDiscovery + + chunker: RandomChunker + manifest: Manifest + store: BlockStore + ssk: st.SecretKey + spk: st.PublicKey + repoDir: string + stpstore: st.StpStore + porMsg: PorMessage + cid: Cid + por: PoR + tags: seq[Tag] + + setupAll: + chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize) + store = CacheStore.new(cacheSize = DataSetSize, chunkSize = BlockSize) + manifest = Manifest.new(blockSize = BlockSize).tryGet() + (spk, ssk) = st.keyGen() + + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + + let + blk = bt.Block.new(chunk).tryGet() + + manifest.add(blk.cid) + if not (await store.putBlock(blk)): + raise newException(CatchableError, "Unable to store block " & $blk.cid) + + cid = manifest.cid.tryGet() + por = await PoR.init( + StoreStream.new(store, manifest), + ssk, spk, + BlockSize) + + porMsg = por.toMessage() + tags = blocks.mapIt( + Tag(idx: it, tag: porMsg.authenticators[it]) ) + + setup: + switch1 = newStandardSwitch() + switch2 = newStandardSwitch() + + discovery1 = MockDiscovery.new(switch1.peerInfo) + discovery2 = MockDiscovery.new(switch2.peerInfo) + + stpNetwork1 = StpNetwork.new(switch1, discovery1) + stpNetwork2 = StpNetwork.new(switch2, discovery2) + + switch1.mount(stpNetwork1) + switch2.mount(stpNetwork2) + + await switch1.start() + await switch2.start() + + teardown: + await switch1.stop() + await switch2.stop() + + test "Should upload to host": + var + done = newFuture[void]() + + discovery1.findHostProvidersHandler = proc(d: MockDiscovery, host: ca.Address): + Future[seq[SignedPeerRecord]] {.async, gcsafe.} = + check hostAddr2 == host + return @[switch2.peerInfo.signedPeerRecord] + + proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} = + check: + Cid.init(msg.cid).tryGet() == cid + msg.tags == tags + + done.complete() + + stpNetwork2.tagsHandle = tagsHandler + (await stpNetwork1.uploadTags( + cid, + blocks, + porMsg.authenticators, + hostAddr2)).tryGet() + + await done.wait(1.seconds) diff --git a/tests/codex/storageproofs/testpor.nim b/tests/codex/storageproofs/testpor.nim index 81ca3d09..5b9c8481 100644 --- a/tests/codex/storageproofs/testpor.nim +++ b/tests/codex/storageproofs/testpor.nim @@ -11,7 +11,6 @@ import pkg/codex/chunker import pkg/codex/rng import pkg/codex/blocktype as bt - import ../helpers const @@ -162,3 +161,5 @@ suite "Test Serialization": check: proof.sigma.blst_p1_is_equal(pproof.sigma).bool proof.mu == pproof.mu + + check por.verifyProof(q, pproof.mu, pproof.sigma) diff --git a/tests/codex/storageproofs/teststpstore.nim b/tests/codex/storageproofs/teststpstore.nim index c2839c1e..545ff68f 100644 --- a/tests/codex/storageproofs/teststpstore.nim +++ b/tests/codex/storageproofs/teststpstore.nim @@ -1,4 +1,5 @@ import std/os +import std/sequtils import pkg/chronos import pkg/asynctest @@ -18,6 +19,7 @@ const suite "Test PoR store": let (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name + blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random var chunker: RandomChunker @@ -28,7 +30,9 @@ suite "Test PoR store": repoDir: string stpstore: st.StpStore por: PoR + porMsg: PorMessage cid: Cid + tags: seq[Tag] setupAll: chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize) @@ -53,6 +57,10 @@ suite "Test PoR store": ssk, spk, BlockSize) + porMsg = por.toMessage() + tags = blocks.mapIt( + Tag(idx: it, tag: porMsg.authenticators[it]) ) + repoDir = path.parentDir / "stp" createDir(repoDir) stpstore = st.StpStore.init(repoDir) @@ -61,8 +69,16 @@ suite "Test PoR store": removeDir(repoDir) test "Should store Storage Proofs": - check (await stpstore.store(por, cid)).isOk - check fileExists(stpstore.stpPath(cid)) + check (await stpstore.store(por.toMessage(), cid)).isOk + check fileExists(stpstore.stpPath(cid) / "por") test "Should retrieve Storage Proofs": - discard (await stpstore.retrieve(cid)).tryGet() + check (await stpstore.retrieve(cid)).tryGet() == porMsg + + test "Should store tags": + check (await stpstore.store(tags, cid)).isOk + for t in tags: + check fileExists(stpstore.stpPath(cid) / $t.idx ) + + test "Should retrieve tags": + check (await stpstore.retrieve(cid, blocks)).tryGet() == tags diff --git a/tests/codex/teststorageproofs.nim b/tests/codex/teststorageproofs.nim index 71b2577a..e90a8c0b 100644 --- a/tests/codex/teststorageproofs.nim +++ b/tests/codex/teststorageproofs.nim @@ -1,4 +1,5 @@ import ./storageproofs/teststpstore import ./storageproofs/testpor +import ./storageproofs/testnetwork {.warning[UnusedImport]: off.}