Upload authenticators (#108)

* initial implementation of storage proofs upload

* make sure proof verifies with after deserializing

* add por store

* rename por store to stp store

* rename porstore to stpstore

* add support for host discovery to discovery mock

* add tags upload network tests
This commit is contained in:
Dmitriy Ryajov 2022-05-25 20:29:31 -06:00 committed by GitHub
parent 738738c3c6
commit 6ce7e23767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 541 additions and 62 deletions

View File

@ -105,7 +105,7 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
try: try:
trace "Advertising block", cid = $cid trace "Advertising block", cid = $cid
let request = b.discovery.provideBlock(cid) let request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request b.inFlightAdvReqs[cid] = request
await request await request
finally: finally:
@ -137,7 +137,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
try: try:
let let
request = b.discovery request = b.discovery
.findBlockProviders(cid) .find(cid)
.wait(DefaultDiscoveryTimeout) .wait(DefaultDiscoveryTimeout)
b.inFlightDiscReqs[cid] = request b.inFlightDiscReqs[cid] = request

View File

@ -18,7 +18,7 @@ logScope:
topics = "codex blockexc networkpeer" topics = "codex blockexc networkpeer"
const const
MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big MaxMessageSize = 100 * 1 shl 20 # manifest files can be big
type type
RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.}

View File

@ -7,12 +7,17 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/algorithm
import pkg/chronos import pkg/chronos
import pkg/chronicles import pkg/chronicles
import pkg/libp2p import pkg/libp2p
import pkg/libp2p/routing_record
import pkg/libp2p/signed_envelope
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/shims/net import pkg/stew/shims/net
import pkg/contractabi/address as ca
import pkg/libp2pdht/discv5/protocol as discv5 import pkg/libp2pdht/discv5/protocol as discv5
import ./rng import ./rng
@ -20,6 +25,10 @@ import ./errors
export discv5 export discv5
# TODO: If generics in methods had not been
# deprecated, this could have been implemented
# much more elegantly.
type type
Discovery* = ref object of RootObj Discovery* = ref object of RootObj
protocol: discv5.Protocol protocol: discv5.Protocol
@ -42,21 +51,31 @@ proc new*(
), ),
localInfo: localInfo) localInfo: localInfo)
proc toNodeId*(cid: Cid): NodeId =
## Cid to discovery id
##
readUintBE[256](keccak256.digest(cid.data.buffer).data)
proc toNodeId*(host: ca.Address): NodeId =
## Eth address to discovery id
##
readUintBE[256](keccak256.digest(host.toArray).data)
proc findPeer*( proc findPeer*(
d: Discovery, d: Discovery,
peerId: PeerID): Future[?PeerRecord] {.async.} = peerId: PeerID): Future[?PeerRecord] {.async.} =
let node = await d.protocol.resolve(toNodeId(peerId)) let
node = await d.protocol.resolve(toNodeId(peerId))
return return
if node.isSome(): if node.isSome():
some(node.get().record.data) some(node.get().record.data)
else: else:
none(PeerRecord) none(PeerRecord)
proc toDiscoveryId*(cid: Cid): NodeId = method find*(
## To discovery id
readUintBE[256](keccak256.digest(cid.data.buffer).data)
method findBlockProviders*(
d: Discovery, d: Discovery,
cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} =
## Find block providers ## Find block providers
@ -64,19 +83,19 @@ method findBlockProviders*(
trace "Finding providers for block", cid = $cid trace "Finding providers for block", cid = $cid
without providers =? without providers =?
(await d.protocol.getProviders(cid.toDiscoveryId())).mapFailure, error: (await d.protocol.getProviders(cid.toNodeId())).mapFailure, error:
trace "Error finding providers for block", cid = $cid, error = error.msg trace "Error finding providers for block", cid = $cid, error = error.msg
return providers return providers
method provideBlock*(d: Discovery, cid: Cid) {.async, base.} = method provide*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid ## Provide a bock Cid
## ##
trace "Providing block", cid = $cid trace "Providing block", cid = $cid
let let
nodes = await d.protocol.addProvider( nodes = await d.protocol.addProvider(
cid.toDiscoveryId(), cid.toNodeId(),
d.localInfo.signedPeerRecord) d.localInfo.signedPeerRecord)
if nodes.len <= 0: if nodes.len <= 0:
@ -84,6 +103,39 @@ method provideBlock*(d: Discovery, cid: Cid) {.async, base.} =
trace "Provided to nodes", nodes = nodes.len trace "Provided to nodes", nodes = nodes.len
method find*(
d: Discovery,
host: ca.Address): Future[seq[SignedPeerRecord]] {.async, base.} =
## Find host providers
##
trace "Finding providers for host", host = $host
without var providers =?
(await d.protocol.getProviders(host.toNodeId())).mapFailure, error:
trace "Error finding providers for host", host = $host, exc = error.msg
return
if providers.len <= 0:
trace "No providers found", host = $host
return
providers.sort do(a, b: SignedPeerRecord) -> int:
system.cmp[uint64](a.data.seqNo, b.data.seqNo)
return providers
method provide*(d: Discovery, host: ca.Address) {.async, base.} =
## Provide hosts
##
trace "Providing host", host = $host
let
nodes = await d.protocol.addProvider(
host.toNodeId(),
d.localInfo.signedPeerRecord)
if nodes.len > 0:
trace "Provided to nodes", nodes = nodes.len
proc start*(d: Discovery) {.async.} = proc start*(d: Discovery) {.async.} =
d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR") d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR")
d.protocol.open() d.protocol.open()

View File

@ -1,5 +1,7 @@
import ./storageproofs/por import ./storageproofs/por
import ./storageproofs/timing import ./storageproofs/timing
import ./storageproofs/stpstore import ./storageproofs/stpstore
import ./storageproofs/stpnetwork
import ./storageproofs/stpproto
export por, timing, stpstore export por, timing, stpstore, stpnetwork, stpproto

View File

@ -12,11 +12,6 @@ message PoREnvelope {
bytes signature = 2; bytes signature = 2;
} }
message ProofMessage {
repeated bytes mu = 1;
bytes sigma = 2;
}
message PubKeyMessage { message PubKeyMessage {
bytes signkey = 1; bytes signkey = 1;
bytes key = 2; bytes key = 2;
@ -28,6 +23,11 @@ message PoREnvelope {
repeated bytes authenticators = 3; repeated bytes authenticators = 3;
} }
message ProofMessage {
repeated bytes mu = 1;
bytes sigma = 2;
}
PorMessage por = 1; PorMessage por = 1;
ProofMessage proof = 2; ProofMessage proof = 2;
} }

View File

@ -0,0 +1,98 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import pkg/contractabi/address as ca
import ../stores
import ../manifest
import ../streams
import ../utils
import ./por
import ./stpnetwork
import ./stpproto
import ./stpstore
import ./timing
export stpnetwork, stpstore, por, timing, stpproto
type
StorageProofs* = object
store*: BlockStore
network*: StpNetwork
stpStore*: StpStore
proc upload*(
self: StorageProofs,
cid: Cid,
indexes: seq[int],
host: ca.Address): Future[?!void] {.async.} =
## Upload authenticators
##
without por =? (await self.stpStore.retrieve(cid)):
trace "Unable to retrieve por data from store", cid
return failure("Unable to retrieve por data from store")
return await self.network.uploadTags(
cid,
indexes,
por.authenticators,
host)
# proc proof*() =
# discard
# proc verify*() =
# discard
proc setupProofs*(
self: StorageProofs,
manifest: Manifest): Future[?!void] {.async.} =
## Setup storage authentication
##
without cid =? manifest.cid:
return failure("Unable to retrieve Cid from manifest!")
let
(spk, ssk) = keyGen()
por = await PoR.init(
StoreStream.new(self.store, manifest),
ssk,
spk,
manifest.blockSize)
return await self.stpStore.store(por.toMessage(), cid)
proc init*(
T: type StorageProofs,
network: StpNetwork,
store: BlockStore,
stpStore: StpStore): StorageProofs =
var
self = T(
store: store,
stpStore: stpStore,
network: network)
proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} =
try:
await self.stpStore.store(msg.cid, msg.tags).tryGet()
trace "Stored tags", cid = $msg.cid, tags = msg.tags.len
except CatchableError as exc:
trace "Exception attempting to store tags", exc = exc.msg
self.network.tagsHandler = tagsHandler
self

View File

@ -0,0 +1,15 @@
syntax = "proto3";
message StorageProofsMessage {
message Tag {
int64 idx = 1;
bytes tag = 2;
}
message TagsMessage {
bytes cid = 1;
repeated Tag tags = 2;
}
TagsMessage tagsMsg = 1;
}

View File

@ -0,0 +1,104 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import pkg/contractabi/address as ca
import pkg/protobuf_serialization
import ./stpproto
import ../discovery
const
Codec* = "/dagger/storageproofs/1.0.0"
MaxMessageSize* = 1 shl 22 # 4MB
logScope:
topics = "dagger storageproofs network"
type
TagsHandler* = proc(msg: TagsMessage):
Future[void] {.raises: [Defect], gcsafe.}
StpNetwork* = ref object of LPProtocol
switch*: Switch
discovery*: Discovery
tagsHandle*: TagsHandler
proc uploadTags*(
self: StpNetwork,
cid: Cid,
indexes: seq[int],
tags: seq[seq[byte]],
host: ca.Address): Future[?!void] {.async.} =
# Upload tags to `host`
#
var msg = TagsMessage(cid: cid.data.buffer)
for i in indexes:
msg.tags.add(Tag(idx: i, tag: tags[i]))
let
peers = await self.discovery.find(host)
connFut = await one(peers.mapIt(
self.switch.dial(
it.data.peerId,
it.data.addresses.mapIt( it.address ),
@[Codec])))
conn = await connFut
try:
await conn.writeLp(
Protobuf.encode(StorageProofsMessage(tagsMsg: msg)))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Exception submitting tags", cid, exc = exc.msg
return failure(exc.msg)
finally:
await conn.close()
return success()
method init*(self: StpNetwork) =
## Perform protocol initialization
##
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let
msg = await conn.readLp(MaxMessageSize)
message = Protobuf.decode(msg, StorageProofsMessage)
if message.tagsMsg.tags.len > 0 and not self.tagsHandle.isNil:
await self.tagsHandle(message.tagsMsg)
except CatchableError as exc:
trace "Exception handling Storage Proofs message", exc = exc.msg
finally:
await conn.close()
self.handler = handle
self.codec = Codec
proc new*(
T: type StpNetwork,
switch: Switch,
discovery: Discovery): StpNetwork =
let
self = StpNetwork(
switch: switch,
discovery: discovery)
self.init()
self

View File

@ -0,0 +1,7 @@
import pkg/protobuf_serialization
import_proto3 "stp.proto"
export StorageProofsMessage
export TagsMessage
export Tag

View File

@ -18,6 +18,7 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/protobuf_serialization import pkg/protobuf_serialization
import ./stpproto
import ./por import ./por
type type
@ -28,40 +29,86 @@ type
template stpPath*(self: StpStore, cid: Cid): string = template stpPath*(self: StpStore, cid: Cid): string =
self.authDir / ($cid)[^self.postfixLen..^1] / $cid self.authDir / ($cid)[^self.postfixLen..^1] / $cid
proc retrieve*(self: StpStore, cid: Cid): Future[?!PorMessage] {.async.} = proc retrieve*(
self: StpStore,
cid: Cid): Future[?!PorMessage] {.async.} =
## Retrieve authenticators from data store ## Retrieve authenticators from data store
## ##
let path = self.stpPath(cid) let path = self.stpPath(cid) / "por"
var data: seq[byte] var data: seq[byte]
if ( if (
let res = io2.readFile(path, data); let res = io2.readFile(path, data);
res.isErr): res.isErr):
let error = io2.ioErrorMsg(res.error) let error = io2.ioErrorMsg(res.error)
trace "Cannot retrieve authenticators from fs", path , error trace "Cannot retrieve storage proof data from fs", path , error
return failure("Cannot retrieve authenticators from fs") return failure("Cannot retrieve storage proof data from fs")
return Protobuf.decode(data, PorMessage).success return Protobuf.decode(data, PorMessage).success
proc store*(self: StpStore, por: PoR, cid: Cid): Future[?!void] {.async.} = proc store*(
self: StpStore,
por: PorMessage,
cid: Cid): Future[?!void] {.async.} =
## Persist storage proofs ## Persist storage proofs
## ##
let let
dir = self.stpPath(cid).parentDir dir = self.stpPath(cid)
if io2.createPath(dir).isErr: if io2.createPath(dir).isErr:
trace "Unable to create storage proofs prefix dir", dir trace "Unable to create storage proofs prefix dir", dir
return failure(&"Unable to create storage proofs prefix dir ${dir}") return failure(&"Unable to create storage proofs prefix dir ${dir}")
let path = self.stpPath(cid) let path = dir / "por"
if ( if (
let res = io2.writeFile(path, Protobuf.encode(por.toMessage())); let res = io2.writeFile(path, Protobuf.encode(por));
res.isErr): res.isErr):
let error = io2.ioErrorMsg(res.error) let error = io2.ioErrorMsg(res.error)
trace "Unable to store storage proofs", path, cid = cid, error trace "Unable to store storage proofs", path, cid = cid, error
return failure( return failure(
&"Unable to store storage proofs path = ${path} cid = ${$cid} error = ${error}") &"Unable to store storage proofs - path = ${path} cid = ${$cid} error = ${error}")
return success()
proc retrieve*(
self: StpStore,
cid: Cid,
blocks: seq[int]): Future[?!seq[Tag]] {.async.} =
var tags: seq[Tag]
for b in blocks:
var tag = Tag(idx: b)
let path = self.stpPath(cid) / $b
if (
let res = io2.readFile(path, tag.tag);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Cannot retrieve tags from fs", path , error
return failure("Cannot retrieve tags from fs")
tags.add(tag)
return tags.success
proc store*(
self: StpStore,
tags: seq[Tag],
cid: Cid): Future[?!void] {.async.} =
let
dir = self.stpPath(cid)
if io2.createPath(dir).isErr:
trace "Unable to create storage proofs prefix dir", dir
return failure(&"Unable to create storage proofs prefix dir ${dir}")
for t in tags:
let path = dir / $t.idx
if (
let res = io2.writeFile(path, t.tag);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Unable to store tags", path, cid = cid, error
return failure(
&"Unable to store tags - path = ${path} cid = ${$cid} error = ${error}")
return success() return success()

View File

@ -1,4 +1,4 @@
import ./utils/asyncheapqueue import ./utils/asyncheapqueue
import ./utils/fileutils import ./utils/fileutils
export asyncheapqueue, fileutils export asyncheapqueue, fileutils

View File

@ -43,7 +43,7 @@ suite "Block Advertising and Discovery":
blocks.add(bt.Block.new(chunk).tryGet()) blocks.add(bt.Block.new(chunk).tryGet())
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
blockDiscovery = MockDiscovery.new(switch.peerInfo, 0.Port) blockDiscovery = MockDiscovery.new()
wallet = WalletRef.example wallet = WalletRef.example
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it )) localStore = CacheStore.new(blocks.mapIt( it ))
@ -76,7 +76,7 @@ suite "Block Advertising and Discovery":
await engine.start() await engine.start()
blockDiscovery.publishProvideHandler = blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid): Future[void] {.async, gcsafe.} = proc(d: MockDiscovery, cid: Cid): Future[void] {.async, gcsafe.} =
return return
@ -94,7 +94,7 @@ suite "Block Advertising and Discovery":
advertised = initTable.collect: advertised = initTable.collect:
for b in blocks: {b.cid: newFuture[void]()} for b in blocks: {b.cid: newFuture[void]()}
blockDiscovery.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} =
if cid in advertised and not advertised[cid].finished(): if cid in advertised and not advertised[cid].finished():
advertised[cid].complete() advertised[cid].complete()
@ -150,7 +150,7 @@ suite "E2E - Multiple Nodes Discovery":
for _ in 0..<4: for _ in 0..<4:
let let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
blockDiscovery = MockDiscovery.new(s.peerInfo, 0.Port) blockDiscovery = MockDiscovery.new()
wallet = WalletRef.example wallet = WalletRef.example
network = BlockExcNetwork.new(s) network = BlockExcNetwork.new(s)
localStore = CacheStore.new() localStore = CacheStore.new()
@ -189,15 +189,15 @@ suite "E2E - Multiple Nodes Discovery":
var advertised: Table[Cid, SignedPeerRecord] var advertised: Table[Cid, SignedPeerRecord]
MockDiscovery(blockexc[1].engine.discovery.discovery) MockDiscovery(blockexc[1].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised.add(cid, switch[1].peerInfo.signedPeerRecord) advertised.add(cid, switch[1].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[2].engine.discovery.discovery) MockDiscovery(blockexc[2].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised.add(cid, switch[2].peerInfo.signedPeerRecord) advertised.add(cid, switch[2].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[3].engine.discovery.discovery) MockDiscovery(blockexc[3].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised.add(cid, switch[3].peerInfo.signedPeerRecord) advertised.add(cid, switch[3].peerInfo.signedPeerRecord)
await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5])
@ -231,15 +231,15 @@ suite "E2E - Multiple Nodes Discovery":
var advertised: Table[Cid, SignedPeerRecord] var advertised: Table[Cid, SignedPeerRecord]
MockDiscovery(blockexc[1].engine.discovery.discovery) MockDiscovery(blockexc[1].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[1].peerInfo.signedPeerRecord advertised[cid] = switch[1].peerInfo.signedPeerRecord
MockDiscovery(blockexc[2].engine.discovery.discovery) MockDiscovery(blockexc[2].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[2].peerInfo.signedPeerRecord advertised[cid] = switch[2].peerInfo.signedPeerRecord
MockDiscovery(blockexc[3].engine.discovery.discovery) MockDiscovery(blockexc[3].engine.discovery.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
advertised[cid] = switch[3].peerInfo.signedPeerRecord advertised[cid] = switch[3].peerInfo.signedPeerRecord
await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5])

View File

@ -79,7 +79,7 @@ suite "Test Discovery Engine":
for b in blocks: for b in blocks:
{ b.cid: newFuture[void]() } { b.cid: newFuture[void]() }
blockDiscovery.publishProvideHandler = blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
if not haves[cid].finished: if not haves[cid].finished:
haves[cid].complete haves[cid].complete
@ -124,7 +124,7 @@ suite "Test Discovery Engine":
discoveryLoopSleep = 100.millis) discoveryLoopSleep = 100.millis)
have = newFuture[void]() have = newFuture[void]()
blockDiscovery.publishProvideHandler = blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid check cid == blocks[0].cid
if not have.finished: if not have.finished:
@ -216,7 +216,7 @@ suite "Test Discovery Engine":
reqs = newFuture[void]() reqs = newFuture[void]()
count = 0 count = 0
blockDiscovery.publishProvideHandler = blockDiscovery.publishBlockProvideHandler =
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
check cid == blocks[0].cid check cid == blocks[0].cid
if count > 0: if count > 0:

View File

@ -8,8 +8,9 @@ import pkg/codex/rng
import ./helpers/nodeutils import ./helpers/nodeutils
import ./helpers/randomchunker import ./helpers/randomchunker
import ./helpers/mockdiscovery
export randomchunker, nodeutils export randomchunker, nodeutils, mockdiscovery
# NOTE: The meaning of equality for blocks # NOTE: The meaning of equality for blocks
# is changed here, because blocks are now `ref` # is changed here, because blocks are now `ref`

View File

@ -13,21 +13,20 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stew/shims/net import pkg/stew/shims/net
import pkg/codex/discovery import pkg/codex/discovery
import pkg/contractabi/address as ca
type type
MockDiscovery* = ref object of Discovery MockDiscovery* = ref object of Discovery
findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid): findBlockProvidersHandler*: proc(d: MockDiscovery, cid: Cid):
Future[seq[SignedPeerRecord]] {.gcsafe.} Future[seq[SignedPeerRecord]] {.gcsafe.}
publishProvideHandler*: proc(d: MockDiscovery, cid: Cid): publishBlockProvideHandler*: proc(d: MockDiscovery, cid: Cid):
Future[void] {.gcsafe.}
findHostProvidersHandler*: proc(d: MockDiscovery, host: ca.Address):
Future[seq[SignedPeerRecord]] {.gcsafe.}
publishHostProvideHandler*: proc(d: MockDiscovery, host: ca.Address):
Future[void] {.gcsafe.} Future[void] {.gcsafe.}
proc new*( proc new*(T: type MockDiscovery): T =
T: type MockDiscovery,
localInfo: PeerInfo,
discoveryPort: Port,
bootstrapNodes = newSeq[SignedPeerRecord](),
): T =
T() T()
proc findPeer*( proc findPeer*(
@ -35,7 +34,7 @@ proc findPeer*(
peerId: PeerID): Future[?PeerRecord] {.async.} = peerId: PeerID): Future[?PeerRecord] {.async.} =
return none(PeerRecord) return none(PeerRecord)
method findBlockProviders*( method find*(
d: MockDiscovery, d: MockDiscovery,
cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = cid: Cid): Future[seq[SignedPeerRecord]] {.async.} =
if isNil(d.findBlockProvidersHandler): if isNil(d.findBlockProvidersHandler):
@ -43,14 +42,22 @@ method findBlockProviders*(
return await d.findBlockProvidersHandler(d, cid) return await d.findBlockProvidersHandler(d, cid)
method provideBlock*(d: MockDiscovery, cid: Cid): Future[void] {.async.} = method provide*(d: MockDiscovery, cid: Cid): Future[void] {.async.} =
if isNil(d.publishProvideHandler): if isNil(d.publishBlockProvideHandler):
return return
await d.publishProvideHandler(d, cid) await d.publishBlockProvideHandler(d, cid)
proc start*(d: Discovery) {.async.} = method find*(
discard d: MockDiscovery,
host: ca.Address): Future[seq[SignedPeerRecord]] {.async.} =
if isNil(d.findHostProvidersHandler):
return
proc stop*(d: Discovery) {.async.} = return await d.findHostProvidersHandler(d, host)
discard
method provide*(d: MockDiscovery, host: ca.Address): Future[void] {.async.} =
if isNil(d.publishHostProvideHandler):
return
await d.publishHostProvideHandler(d, host)

View File

@ -0,0 +1,128 @@
import std/os
import std/sequtils
import pkg/asynctest
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import pkg/protobuf_serialization
import pkg/contractabi as ca
import pkg/codex/rng
import pkg/codex/chunker
import pkg/codex/storageproofs
import pkg/codex/discovery
import pkg/codex/manifest
import pkg/codex/stores
import pkg/codex/storageproofs as st
import pkg/codex/blocktype as bt
import pkg/codex/streams
import ../examples
import ../helpers
const
SectorSize = 31
SectorsPerBlock = BlockSize div SectorSize
DataSetSize = BlockSize * 100
suite "Storage Proofs Network":
let
rng = Rng.instance()
seckey1 = PrivateKey.random(rng[]).tryGet()
seckey2 = PrivateKey.random(rng[]).tryGet()
hostAddr1 = ca.Address.example
hostAddr2 = ca.Address.example
blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random
var
stpNetwork1: StpNetwork
stpNetwork2: StpNetwork
switch1: Switch
switch2: Switch
discovery1: MockDiscovery
discovery2: MockDiscovery
chunker: RandomChunker
manifest: Manifest
store: BlockStore
ssk: st.SecretKey
spk: st.PublicKey
repoDir: string
stpstore: st.StpStore
porMsg: PorMessage
cid: Cid
por: PoR
tags: seq[Tag]
setupAll:
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize)
store = CacheStore.new(cacheSize = DataSetSize, chunkSize = BlockSize)
manifest = Manifest.new(blockSize = BlockSize).tryGet()
(spk, ssk) = st.keyGen()
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let
blk = bt.Block.new(chunk).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):
raise newException(CatchableError, "Unable to store block " & $blk.cid)
cid = manifest.cid.tryGet()
por = await PoR.init(
StoreStream.new(store, manifest),
ssk, spk,
BlockSize)
porMsg = por.toMessage()
tags = blocks.mapIt(
Tag(idx: it, tag: porMsg.authenticators[it]) )
setup:
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
discovery1 = MockDiscovery.new(switch1.peerInfo)
discovery2 = MockDiscovery.new(switch2.peerInfo)
stpNetwork1 = StpNetwork.new(switch1, discovery1)
stpNetwork2 = StpNetwork.new(switch2, discovery2)
switch1.mount(stpNetwork1)
switch2.mount(stpNetwork2)
await switch1.start()
await switch2.start()
teardown:
await switch1.stop()
await switch2.stop()
test "Should upload to host":
var
done = newFuture[void]()
discovery1.findHostProvidersHandler = proc(d: MockDiscovery, host: ca.Address):
Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
check hostAddr2 == host
return @[switch2.peerInfo.signedPeerRecord]
proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} =
check:
Cid.init(msg.cid).tryGet() == cid
msg.tags == tags
done.complete()
stpNetwork2.tagsHandle = tagsHandler
(await stpNetwork1.uploadTags(
cid,
blocks,
porMsg.authenticators,
hostAddr2)).tryGet()
await done.wait(1.seconds)

View File

@ -11,7 +11,6 @@ import pkg/codex/chunker
import pkg/codex/rng import pkg/codex/rng
import pkg/codex/blocktype as bt import pkg/codex/blocktype as bt
import ../helpers import ../helpers
const const
@ -162,3 +161,5 @@ suite "Test Serialization":
check: check:
proof.sigma.blst_p1_is_equal(pproof.sigma).bool proof.sigma.blst_p1_is_equal(pproof.sigma).bool
proof.mu == pproof.mu proof.mu == pproof.mu
check por.verifyProof(q, pproof.mu, pproof.sigma)

View File

@ -1,4 +1,5 @@
import std/os import std/os
import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/asynctest import pkg/asynctest
@ -18,6 +19,7 @@ const
suite "Test PoR store": suite "Test PoR store":
let let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random
var var
chunker: RandomChunker chunker: RandomChunker
@ -28,7 +30,9 @@ suite "Test PoR store":
repoDir: string repoDir: string
stpstore: st.StpStore stpstore: st.StpStore
por: PoR por: PoR
porMsg: PorMessage
cid: Cid cid: Cid
tags: seq[Tag]
setupAll: setupAll:
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize) chunker = RandomChunker.new(Rng.instance(), size = DataSetSize, chunkSize = BlockSize)
@ -53,6 +57,10 @@ suite "Test PoR store":
ssk, spk, ssk, spk,
BlockSize) BlockSize)
porMsg = por.toMessage()
tags = blocks.mapIt(
Tag(idx: it, tag: porMsg.authenticators[it]) )
repoDir = path.parentDir / "stp" repoDir = path.parentDir / "stp"
createDir(repoDir) createDir(repoDir)
stpstore = st.StpStore.init(repoDir) stpstore = st.StpStore.init(repoDir)
@ -61,8 +69,16 @@ suite "Test PoR store":
removeDir(repoDir) removeDir(repoDir)
test "Should store Storage Proofs": test "Should store Storage Proofs":
check (await stpstore.store(por, cid)).isOk check (await stpstore.store(por.toMessage(), cid)).isOk
check fileExists(stpstore.stpPath(cid)) check fileExists(stpstore.stpPath(cid) / "por")
test "Should retrieve Storage Proofs": test "Should retrieve Storage Proofs":
discard (await stpstore.retrieve(cid)).tryGet() check (await stpstore.retrieve(cid)).tryGet() == porMsg
test "Should store tags":
check (await stpstore.store(tags, cid)).isOk
for t in tags:
check fileExists(stpstore.stpPath(cid) / $t.idx )
test "Should retrieve tags":
check (await stpstore.retrieve(cid, blocks)).tryGet() == tags

View File

@ -1,4 +1,5 @@
import ./storageproofs/teststpstore import ./storageproofs/teststpstore
import ./storageproofs/testpor import ./storageproofs/testpor
import ./storageproofs/testnetwork
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}