Merge branch 'master' into blockexchange-uses-merkle-tree

This commit is contained in:
Tomasz Bekas 2023-11-04 07:55:26 +01:00
commit 67a0fb4df9
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
53 changed files with 226 additions and 1672 deletions

View File

@ -118,4 +118,5 @@ jobs:
files: ./coverage/coverage.f.info
flags: unittests
name: codecov-umbrella
token: ${{ secrets.CODECOV_TOKEN }}
verbose: true

View File

@ -39,12 +39,12 @@ export peers, pendingblocks, payments, discovery
logScope:
topics = "codex blockexcengine"
declareCounter(codexBlockExchangeWantHaveListsSent, "codex blockexchange wantHave lists sent")
declareCounter(codexBlockExchangeWantHaveListsReceived, "codex blockexchange wantHave lists received")
declareCounter(codexBlockExchangeWantBlockListsSent, "codex blockexchange wantBlock lists sent")
declareCounter(codexBlockExchangeWantBlockListsReceived, "codex blockexchange wantBlock lists received")
declareCounter(codexBlockExchangeBlocksSent, "codex blockexchange blocks sent")
declareCounter(codexBlockExchangeBlocksReceived, "codex blockexchange blocks received")
declareCounter(codex_block_exchange_want_have_lists_sent, "codex blockexchange wantHave lists sent")
declareCounter(codex_block_exchange_want_have_lists_received, "codex blockexchange wantHave lists received")
declareCounter(codex_block_exchange_want_block_lists_sent, "codex blockexchange wantBlock lists sent")
declareCounter(codex_block_exchange_want_block_lists_received, "codex blockexchange wantBlock lists received")
declareCounter(codex_block_exchange_blocks_sent, "codex blockexchange blocks sent")
declareCounter(codex_block_exchange_blocks_received, "codex blockexchange blocks received")
const
DefaultMaxPeersPerRequest* = 10
@ -201,9 +201,9 @@ proc requestBlock*(
asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(address, peer)
codexBlockExchangeWantBlockListsSent.inc()
codex_block_exchange_want_block_lists_sent.inc()
await b.sendWantHave(address, peer, toSeq(b.peers))
codexBlockExchangeWantHaveListsSent.inc()
codex_block_exchange_want_have_lists_sent.inc()
return await blockFuture
@ -369,7 +369,7 @@ proc blocksDeliveryHandler*(
validatedBlocksDelivery.add(bd)
await b.resolveBlocks(validatedBlocksDelivery)
codexBlockExchangeBlocksReceived.inc(validatedBlocksDelivery.len.int64)
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
let
peerCtx = b.peers.get(peer)
@ -411,7 +411,7 @@ proc wantListHandler*(
.price.toBytesBE)
if e.wantType == WantType.WantHave:
codexBlockExchangeWantHaveListsReceived.inc()
codex_block_exchange_want_have_lists_received.inc()
if not have and e.sendDontHave:
trace "Adding dont have entry to presence response", address = e.address
@ -430,7 +430,7 @@ proc wantListHandler*(
elif e.wantType == WantType.WantBlock:
trace "Added entry to peer's want blocks list", address = e.address
peerCtx.peerWants.add(e)
codexBlockExchangeWantBlockListsReceived.inc()
codex_block_exchange_want_block_lists_received.inc()
else:
# peer doesn't want this block anymore
if e.cancel:
@ -560,7 +560,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
blocksDelivery
)
codexBlockExchangeBlocksSent.inc(blocksDelivery.len.int64)
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
trace "About to remove entries from peerWants", blocks = blocksDelivery.len, items = task.peerWants.len
# Remove successfully sent blocks

View File

@ -27,8 +27,8 @@ import ../../merkletree
logScope:
topics = "codex pendingblocks"
declareGauge(codexBlockExchangePendingBlockRequests, "codex blockexchange pending block requests")
declareGauge(codexBlockExchangeRetrievalTimeUs, "codex blockexchange block retrieval time us")
declareGauge(codex_block_exchange_pending_block_requests, "codex blockexchange pending block requests")
declareGauge(codex_block_exchange_retrieval_time_us, "codex blockexchange block retrieval time us")
const
DefaultBlockTimeout* = 10.minutes
@ -43,7 +43,7 @@ type
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
proc updatePendingBlockGauge(p: PendingBlocksManager) =
codexBlockExchangePendingBlockRequests.set(p.blocks.len.int64)
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
proc getWantHandle*(
p: PendingBlocksManager,
@ -103,7 +103,7 @@ proc resolve*(
blockReq.handle.complete(bd.blk)
codexBlockExchangeRetrievalTimeUs.set(retrievalDurationUs)
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
trace "Block retrieval time", retrievalDurationUs, address = bd.address
else:
trace "Block handle already finished", address = bd.address

View File

@ -12,7 +12,7 @@ import ./marketplace
export market
logScope:
topics = "marketplace onchain market"
topics = "marketplace onchain market"
type
OnChainMarket* = ref object of Market

View File

@ -54,6 +54,7 @@ type
Finished
Failed
Paid
Cancelled
proc `==`*(x, y: Nonce): bool {.borrow.}
proc `==`*(x, y: RequestId): bool {.borrow.}

View File

@ -1,9 +1,13 @@
import pkg/metrics
import pkg/chronicles
import ../statemachine
import ./errorhandling
import ./error
declareCounter(codexPurchasesCancelled, "codex purchases cancelled")
declareCounter(codex_purchases_cancelled, "codex purchases cancelled")
logScope:
topics = "marketplace purchases cancelled"
type PurchaseCancelled* = ref object of ErrorHandlingState
@ -11,8 +15,11 @@ method `$`*(state: PurchaseCancelled): string =
"cancelled"
method run*(state: PurchaseCancelled, machine: Machine): Future[?State] {.async.} =
codexPurchasesCancelled.inc()
codex_purchases_cancelled.inc()
let purchase = Purchase(machine)
warn "Request cancelled, withdrawing remaining funds", requestId = $purchase.requestId
await purchase.market.withdrawFunds(purchase.requestId)
let error = newException(Timeout, "Purchase cancelled due to timeout")
return some State(PurchaseErrored(error: error))
purchase.future.fail(error)

View File

@ -1,7 +1,12 @@
import pkg/metrics
import pkg/chronicles
import ../statemachine
import ../../utils/exceptions
declareCounter(codexPurchasesError, "codex purchases error")
declareCounter(codex_purchases_error, "codex purchases error")
logScope:
topics = "marketplace purchases errored"
type PurchaseErrored* = ref object of PurchaseState
error*: ref CatchableError
@ -10,6 +15,9 @@ method `$`*(state: PurchaseErrored): string =
"errored"
method run*(state: PurchaseErrored, machine: Machine): Future[?State] {.async.} =
codexPurchasesError.inc()
codex_purchases_error.inc()
let purchase = Purchase(machine)
error "Purchasing error", error=state.error.msgDetail, requestId = purchase.requestId
purchase.future.fail(state.error)

View File

@ -2,7 +2,7 @@ import pkg/metrics
import ../statemachine
import ./error
declareCounter(codexPurchasesFailed, "codex purchases failed")
declareCounter(codex_purchases_failed, "codex purchases failed")
type
PurchaseFailed* = ref object of PurchaseState
@ -11,6 +11,6 @@ method `$`*(state: PurchaseFailed): string =
"failed"
method run*(state: PurchaseFailed, machine: Machine): Future[?State] {.async.} =
codexPurchasesFailed.inc()
codex_purchases_failed.inc()
let error = newException(PurchaseError, "Purchase failed")
return some State(PurchaseErrored(error: error))

View File

@ -1,7 +1,11 @@
import pkg/metrics
import pkg/chronicles
import ../statemachine
declareCounter(codexPurchasesFinished, "codex purchases finished")
declareCounter(codex_purchases_finished, "codex purchases finished")
logScope:
topics = "marketplace purchases finished"
type PurchaseFinished* = ref object of PurchaseState
@ -9,6 +13,7 @@ method `$`*(state: PurchaseFinished): string =
"finished"
method run*(state: PurchaseFinished, machine: Machine): Future[?State] {.async.} =
codexPurchasesFinished.inc()
codex_purchases_finished.inc()
let purchase = Purchase(machine)
info "Purchase finished", requestId = purchase.requestId
purchase.future.complete()

View File

@ -3,7 +3,7 @@ import ../statemachine
import ./errorhandling
import ./submitted
declareCounter(codexPurchasesPending, "codex purchases pending")
declareCounter(codex_purchases_pending, "codex purchases pending")
type PurchasePending* = ref object of ErrorHandlingState
@ -11,7 +11,7 @@ method `$`*(state: PurchasePending): string =
"pending"
method run*(state: PurchasePending, machine: Machine): Future[?State] {.async.} =
codexPurchasesPending.inc()
codex_purchases_pending.inc()
let purchase = Purchase(machine)
let request = !purchase.request
await purchase.market.requestStorage(request)

View File

@ -1,10 +1,14 @@
import pkg/metrics
import pkg/chronicles
import ../statemachine
import ./errorhandling
import ./finished
import ./failed
declareCounter(codexPurchasesStarted, "codex purchases started")
declareCounter(codex_purchases_started, "codex purchases started")
logScope:
topics = "marketplace purchases started"
type PurchaseStarted* = ref object of ErrorHandlingState
@ -12,11 +16,12 @@ method `$`*(state: PurchaseStarted): string =
"started"
method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.} =
codexPurchasesStarted.inc()
codex_purchases_started.inc()
let purchase = Purchase(machine)
let clock = purchase.clock
let market = purchase.market
info "All required slots filled, purchase started", requestId = purchase.requestId
let failed = newFuture[void]()
proc callback(_: RequestId) =

View File

@ -1,10 +1,14 @@
import pkg/metrics
import pkg/chronicles
import ../statemachine
import ./errorhandling
import ./started
import ./cancelled
declareCounter(codexPurchasesSubmitted, "codex purchases submitted")
logScope:
topics = "marketplace purchases submitted"
declareCounter(codex_purchases_submitted, "codex purchases submitted")
type PurchaseSubmitted* = ref object of ErrorHandlingState
@ -12,12 +16,14 @@ method `$`*(state: PurchaseSubmitted): string =
"submitted"
method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.} =
codexPurchasesSubmitted.inc()
codex_purchases_submitted.inc()
let purchase = Purchase(machine)
let request = !purchase.request
let market = purchase.market
let clock = purchase.clock
info "Request submitted, waiting for slots to be filled", requestId = purchase.requestId
proc wait {.async.} =
let done = newFuture[void]()
proc callback(_: RequestId) =
@ -27,7 +33,7 @@ method run*(state: PurchaseSubmitted, machine: Machine): Future[?State] {.async.
await subscription.unsubscribe()
proc withTimeout(future: Future[void]) {.async.} =
let expiry = request.expiry.truncate(int64)
let expiry = request.expiry.truncate(int64) + 1
await future.withTimeout(clock, expiry)
try:

View File

@ -7,7 +7,7 @@ import ./cancelled
import ./finished
import ./failed
declareCounter(codexPurchasesUnknown, "codex purchases unknown")
declareCounter(codex_purchases_unknown, "codex purchases unknown")
type PurchaseUnknown* = ref object of ErrorHandlingState
@ -15,7 +15,7 @@ method `$`*(state: PurchaseUnknown): string =
"unknown"
method run*(state: PurchaseUnknown, machine: Machine): Future[?State] {.async.} =
codexPurchasesUnknown.inc()
codex_purchases_unknown.inc()
let purchase = Purchase(machine)
if (request =? await purchase.market.getRequest(purchase.requestId)) and
(requestState =? await purchase.market.requestState(purchase.requestId)):

View File

@ -43,8 +43,8 @@ import ./json
logScope:
topics = "codex restapi"
declareCounter(codexApiUploads, "codex API uploads")
declareCounter(codexApiDownloads, "codex API downloads")
declareCounter(codex_api_uploads, "codex API uploads")
declareCounter(codex_api_downloads, "codex API downloads")
proc validate(
pattern: string,
@ -168,7 +168,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
trace "Sending chunk", size = buff.len
await resp.sendChunk(addr buff[0], buff.len)
await resp.finish()
codexApiDownloads.inc()
codex_api_downloads.inc()
except CatchableError as exc:
trace "Excepting streaming blocks", exc = exc.msg
return RestApiResponse.error(Http500)
@ -243,7 +243,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
trace "Error uploading file", exc = error.msg
return RestApiResponse.error(Http500, error.msg)
codexApiUploads.inc()
codex_api_uploads.inc()
trace "Uploaded file", cid
return RestApiResponse.response($cid)
except CancelledError:

View File

@ -54,6 +54,11 @@ proc retrieveRequest*(agent: SalesAgent) {.async.} =
if data.request.isNone:
data.request = await market.getRequest(data.requestId)
proc retrieveRequestState*(agent: SalesAgent): Future[?RequestState] {.async.} =
let data = agent.data
let market = agent.context.market
return await market.requestState(data.requestId)
proc subscribeCancellation(agent: SalesAgent) {.async.} =
let data = agent.data
let clock = agent.context.clock
@ -62,8 +67,20 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
without request =? data.request:
return
await clock.waitUntil(request.expiry.truncate(int64))
agent.schedule(cancelledEvent(request))
while true:
let deadline = max(clock.now, request.expiry.truncate(int64)) + 1
trace "Waiting for request to be cancelled", now=clock.now, expiry=deadline
await clock.waitUntil(deadline)
without state =? await agent.retrieveRequestState():
error "Uknown request", requestId = data.requestId
return
if state == RequestState.Cancelled:
agent.schedule(cancelledEvent(request))
break
debug "The request is not yet canceled, even though it should be. Waiting for some more time.", currentState = state, now=clock.now
data.cancelled = onCancelled()

View File

@ -1,18 +1,38 @@
import pkg/chronicles
import ../salesagent
import ../statemachine
import ./errorhandling
import ./errored
logScope:
topics = "marketplace sales cancelled"
topics = "marketplace sales cancelled"
type
SaleCancelled* = ref object of ErrorHandlingState
SaleCancelledError* = object of CatchableError
SaleTimeoutError* = object of SaleCancelledError
method `$`*(state: SaleCancelled): string = "SaleCancelled"
method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
let error = newException(SaleTimeoutError, "Sale cancelled due to timeout")
return some State(SaleErrored(error: error))
let agent = SalesAgent(machine)
let data = agent.data
let market = agent.context.market
without request =? data.request:
raiseAssert "no sale request"
without slotIndex =? data.slotIndex:
raiseAssert("no slot index assigned")
let slot = Slot(request: request, slotIndex: slotIndex)
debug "Collecting collateral and partial payout", requestId = $data.requestId, slotIndex
await market.freeSlot(slot.id)
if onClear =? agent.context.onClear and
request =? data.request and
slotIndex =? data.slotIndex:
onClear(request, slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp()
warn "Sale cancelled due to timeout", requestId = $data.requestId, slotIndex

View File

@ -7,7 +7,7 @@ import ../salesagent
import ../../utils/exceptions
logScope:
topics = "marketplace sales errored"
topics = "marketplace sales errored"
type SaleErrored* = ref object of SaleState
error*: ref CatchableError

View File

@ -1,10 +1,11 @@
import pkg/chronicles
import ../salesagent
import ../statemachine
import ./errorhandling
import ./errored
logScope:
topics = "marketplace sales failed"
topics = "marketplace sales failed"
type
SaleFailed* = ref object of ErrorHandlingState
@ -13,5 +14,18 @@ type
method `$`*(state: SaleFailed): string = "SaleFailed"
method run*(state: SaleFailed, machine: Machine): Future[?State] {.async.} =
let data = SalesAgent(machine).data
let market = SalesAgent(machine).context.market
without request =? data.request:
raiseAssert "no sale request"
without slotIndex =? data.slotIndex:
raiseAssert("no slot index assigned")
let slot = Slot(request: request, slotIndex: slotIndex)
debug "Removing slot from mySlots", requestId = $data.requestId, slotIndex
await market.freeSlot(slot.id)
let error = newException(SaleFailedError, "Sale failed")
return some State(SaleErrored(error: error))

View File

@ -11,7 +11,7 @@ import ./proving
import ./provingsimulated
logScope:
topics = "marketplace sales filled"
topics = "marketplace sales filled"
type
SaleFilled* = ref object of ErrorHandlingState

View File

@ -8,7 +8,7 @@ import ./cancelled
import ./failed
logScope:
topics = "marketplace sales filling"
topics = "marketplace sales filling"
type
SaleFilling* = ref object of ErrorHandlingState

View File

@ -7,7 +7,7 @@ import ./cancelled
import ./failed
logScope:
topics = "marketplace sales finished"
topics = "marketplace sales finished"
type
SaleFinished* = ref object of ErrorHandlingState

View File

@ -5,7 +5,7 @@ import ../salesagent
import ./errorhandling
logScope:
topics = "marketplace sales ignored"
topics = "marketplace sales ignored"
type
SaleIgnored* = ref object of ErrorHandlingState

View File

@ -7,7 +7,7 @@ import ./cancelled
import ./failed
logScope:
topics = "marketplace sales initial-proving"
topics = "marketplace sales initial-proving"
type
SaleInitialProving* = ref object of ErrorHandlingState

View File

@ -8,7 +8,7 @@ import ./failed
import ./finished
logScope:
topics = "marketplace sales payout"
topics = "marketplace sales payout"
type
SalePayout* = ref object of ErrorHandlingState

View File

@ -16,7 +16,7 @@ type
SalePreparing* = ref object of ErrorHandlingState
logScope:
topics = "marketplace sales preparing"
topics = "marketplace sales preparing"
method `$`*(state: SalePreparing): string = "SalePreparing"

View File

@ -11,7 +11,7 @@ import ./errored
import ./payout
logScope:
topics = "marketplace sales proving"
topics = "marketplace sales proving"
type
SlotNotFilledError* = object of CatchableError

View File

@ -9,7 +9,7 @@ import ./cancelled
import ./payout
logScope:
topics = "marketplace sales unknown"
topics = "marketplace sales unknown"
type
SaleUnknown* = ref object of SaleState
@ -54,3 +54,5 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
return some State(SaleFinished())
of SlotState.Failed:
return some State(SaleFailed())
of SlotState.Cancelled:
return some State(SaleCancelled())

View File

@ -1,4 +0,0 @@
import ./por/serialization
import ./por/por
export por, serialization

View File

@ -1,482 +0,0 @@
## Nim-Codex
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
# Implementation of the BLS-based public PoS scheme from
# Shacham H., Waters B., "Compact Proofs of Retrievability"
# using pairing over BLS12-381 ECC
#
# Notation from the paper
# In Z:
# - n: number of blocks
# - s: number of sectors per block
#
# In Z_p: modulo curve order
# - m_{ij}: sectors of the file i:0..n-1 j:0..s-1
# - α: PoS secret key
# - name: random string
# - μ_j: part of proof, j:0..s-1
#
# In G_1: multiplicative cyclic group
# - H: {0,1} →G_1 : hash function
# - u_1,…,u_s ←R G_1 : random coefficients
# - σ_i: authenticators
# - σ: part of proof
#
# In G_2: multiplicative cyclic group
# - g: generator of G_2
# - v ← g^α: PoS public key
#
# In G_T:
# - used only to calculate the two pairings during validation
#
# Implementation:
# Our implementation uses additive cyclic groups instead of the multiplicative
# cyclic group in the paper, thus changing the name of the group operation as in
# blscurve and blst. Thus, point multiplication becomes point addition, and scalar
# exponentiation becomes scalar multiplicaiton.
#
# Number of operations:
# The following table summarizes the number of operations in different phases
# using the following notation:
# - f: file size expressed in units of 31 bytes
# - n: number of blocks
# - s: number of sectors per block
# - q: number of query items
#
# Since f = n * s and s is a parameter of the scheme, it is better to express
# the cost as a function of f and s. This only matters for Setup, all other
# phases are independent of the file size assuming a given q.
#
# | | Setup | Challenge | Proof | Verify |
# |----------------|-----------|---------------|-----------|-----------|-----------|
# | G1 random | s = s | q | | |
# | G1 scalar mult | n * (s+1) = f * (1 + 1/s) | | q | q + s |
# | G1 add | n * s = f | | q-1 | q-1 + s-1 |
# | Hash to G1 | n = f / s | | | q |
# | Z_p mult | = | | s * q | |
# | Z_p add | = | | s * (q-1) | |
# | pairing | = | | | 2 |
#
#
# Storage and communication cost:
# The storage overhead for a file of f_b bytes is given by the n authenticators
# calculated in the setup phase.
# f_b = f * 31 = n * s * 31
# Each authenticator is a point on G_1, which occupies 48 bytes in compressed form.
# Thus, the overall sorage size in bytes is:
# f_pos = fb + n * 48 = fb * (1 + (48/31) * (1/s))
#
# Communicaiton cost in the Setup phase is simply related to the storage cost.
# The size of the challenge is
# q * (8 + 48) bytes
# The size of the proof is instead
# s * 32 + 48 bytes
import std/endians
import pkg/chronos
import pkg/blscurve
import pkg/blscurve/blst/blst_abi
import ../../rng
import ../../streams
# sector size in bytes. Must be smaller than the subgroup order r
# which is 255 bits long for BLS12-381
const
BytesPerSector* = 31
# length in bytes of the unique (random) name
Namelen = 512
type
# a single sector
ZChar* = array[BytesPerSector, byte]
# secret key combining the metadata signing key and the POR generation key
SecretKey* = object
signkey*: blscurve.SecretKey
key*: blst_scalar
# public key combining the metadata signing key and the POR validation key
PublicKey* = object
signkey*: blscurve.PublicKey
key*: blst_p2
# POR metadata (called "file tag t_0" in the original paper)
TauZero* = object
name*: array[Namelen, byte]
n*: int64
u*: seq[blst_p1]
# signed POR metadata (called "signed file tag t" in the original paper)
Tau* = object
t*: TauZero
signature*: array[96, byte]
Proof* = object
mu*: seq[blst_scalar]
sigma*: blst_p1
# PoR query element
QElement* = object
i*: int64
v*: blst_scalar
PoR* = object
ssk*: SecretKey
spk*: PublicKey
tau*: Tau
authenticators*: seq[blst_p1]
proc fromBytesBE(a: openArray[byte]): blst_scalar =
## Convert data to blst native form
##
var b: array[32, byte]
doAssert(a.len <= b.len)
let d = b.len - a.len
for i in 0..<a.len:
b[i+d] = a[i]
blst_scalar_from_bendian(result, b)
doAssert(blst_scalar_fr_check(result).bool)
proc getSector(
stream: SeekableStream,
blockId: int64,
sectorId: int64,
spb: int64): Future[ZChar] {.async.} =
## Read file sector at given <blockid, sectorid> postion
##
var res: ZChar
stream.setPos(((blockId * spb + sectorId) * ZChar.len).int)
discard await stream.readOnce(addr res[0], ZChar.len)
return res
proc rndScalar(): blst_scalar =
## Generate random scalar within the subroup order r
##
var scal : array[32, byte]
var scalar : blst_scalar
while true:
for val in scal.mitems:
val = byte Rng.instance.rand(0xFF)
scalar.blst_scalar_from_bendian(scal)
if blst_scalar_fr_check(scalar).bool:
break
return scalar
proc rndP2(): (blst_p2, blst_scalar) =
## Generate random point on G2
##
var
x : blst_p2
x.blst_p2_from_affine(BLS12_381_G2) # init from generator
let
scalar = rndScalar()
x.blst_p2_mult(x, scalar, 255)
return (x, scalar)
proc rndP1(): (blst_p1, blst_scalar) =
## Generate random point on G1
var
x : blst_p1
x.blst_p1_from_affine(BLS12_381_G1) # init from generator
let
scalar = rndScalar()
x.blst_p1_mult(x, scalar, 255)
return (x, scalar)
template posKeygen(): (blst_p2, blst_scalar) =
## Generate POS key pair
##
rndP2()
proc keyGen*(): (PublicKey, SecretKey) =
## Generate key pair for signing metadata and for POS tags
##
var
pk: PublicKey
sk: SecretKey
ikm: array[32, byte]
for b in ikm.mitems:
b = byte Rng.instance.rand(0xFF)
doAssert ikm.keyGen(pk.signkey, sk.signkey)
(pk.key, sk.key) = posKeygen()
return (pk, sk)
proc sectorsCount(stream: SeekableStream, s: int64): int64 =
## Calculate number of blocks for a file
##
let
size = stream.size()
n = ((size - 1) div (s * sizeof(ZChar))) + 1
# debugEcho "File size=", size, " bytes",
# ", blocks=", n,
# ", sectors/block=", $s,
# ", sectorsize=", $sizeof(ZChar), " bytes"
return n
proc hashToG1[T: byte|char](msg: openArray[T]): blst_p1 =
## Hash to curve with Dagger specific domain separation
##
const dst = "DAGGER-PROOF-OF-CONCEPT"
result.blst_hash_to_g1(msg, dst, aug = "")
proc hashNameI(name: array[Namelen, byte], i: int64): blst_p1 =
## Calculate unique filename and block index based hash
##
# # naive implementation, hashing a long string representation
# # such as "[255, 242, 23]1"
# return hashToG1($name & $i)
# more compact and faster implementation
var namei: array[sizeof(name) + sizeof(int64), byte]
namei[0..sizeof(name)-1] = name
bigEndian64(addr(namei[sizeof(name)]), unsafeAddr(i))
return hashToG1(namei)
proc generateAuthenticatorOpt(
stream: SeekableStream,
ssk: SecretKey,
i: int64,
s: int64,
t: TauZero,
ubase: seq[blst_scalar]): Future[blst_p1] {.async.} =
## Optimized implementation of authenticator generation
## This implementation is reduces the number of scalar multiplications
## from s+1 to 1+1 , using knowledge about the scalars (r_j)
## used to generate u_j as u_j = g^{r_j}
##
## With the paper's multiplicative notation, we use:
## (H(file||i)\cdot g^{\sum{j=0}^{s-1}{r_j \cdot m[i][j]}})^{\alpha}
##
var sum: blst_fr
var sums: blst_scalar
for j in 0..<s:
var a, b, x: blst_fr
a.blst_fr_from_scalar(ubase[j])
b.blst_fr_from_scalar(fromBytesBE((await stream.getSector(i, j, s))))
x.blst_fr_mul(a, b)
sum.blst_fr_add(sum, x)
sums.blst_scalar_from_fr(sum)
result.blst_p1_from_affine(BLS12_381_G1)
result.blst_p1_mult(result, sums, 255)
result.blst_p1_add_or_double(result, hashNameI(t.name, i))
result.blst_p1_mult(result, ssk.key, 255)
proc generateAuthenticator(
stream: SeekableStream,
ssk: SecretKey,
i: int64,
s: int64,
t: TauZero,
ubase: seq[blst_scalar]): Future[blst_p1] =
## Wrapper to select tag generator implementation
##
# let a = generateAuthenticatorNaive(i, s, t, f, ssk)
return generateAuthenticatorOpt(stream, ssk, i, s, t, ubase)
# doAssert(a.blst_p1_is_equal(b).bool)
proc generateQuery*(tau: Tau, l: int): seq[QElement] =
## Generata a random BLS query of given size
##
let n = tau.t.n # number of blocks
for i in 0..<l:
var q: QElement
q.i = Rng.instance.rand(n-1) #TODO: dedup
q.v = rndScalar() #TODO: fix range
result.add(q)
proc generateProof*(
stream: SeekableStream,
q: seq[QElement],
authenticators: seq[blst_p1],
s: int64
): Future[Proof] {.async.} =
## Generata BLS proofs for a given query
##
var
mu: seq[blst_scalar]
for j in 0..<s:
var
muj: blst_fr
for qelem in q:
let
sect = fromBytesBE((await stream.getSector(qelem.i, j, s)))
var
x, v, sector: blst_fr
sector.blst_fr_from_scalar(sect)
v.blst_fr_from_scalar(qelem.v)
x.blst_fr_mul(v, sector)
muj.blst_fr_add(muj, x)
var
mujs: blst_scalar
mujs.blst_scalar_from_fr(muj)
mu.add(mujs)
var
sigma: blst_p1
for qelem in q:
var
prod: blst_p1
prod.blst_p1_mult(authenticators[qelem.i], qelem.v, 255)
sigma.blst_p1_add_or_double(sigma, prod)
return Proof(mu: mu, sigma: sigma)
proc pairing(a: blst_p1, b: blst_p2): blst_fp12 =
## Calculate pairing G_1,G_2 -> G_T
##
var
aa: blst_p1_affine
bb: blst_p2_affine
l: blst_fp12
blst_p1_to_affine(aa, a)
blst_p2_to_affine(bb, b)
blst_miller_loop(l, bb, aa)
blst_final_exp(result, l)
proc verifyPairingsNaive(a1: blst_p1, a2: blst_p2, b1: blst_p1, b2: blst_p2) : bool =
let e1 = pairing(a1, a2)
let e2 = pairing(b1, b2)
return e1 == e2
proc verifyPairings(a1: blst_p1, a2: blst_p2, b1: blst_p1, b2: blst_p2) : bool =
## Wrapper to select verify pairings implementation
##
verifyPairingsNaive(a1, a2, b1, b2)
#verifyPairingsNeg(a1, a2, b1, b2)
proc verifyProof*(
self: PoR,
q: seq[QElement],
mus: seq[blst_scalar],
sigma: blst_p1): bool =
## Verify a BLS proof given a query
##
# verify signature on Tau
var signature: blscurve.Signature
if not signature.fromBytes(self.tau.signature):
return false
if not verify(self.spk.signkey, $self.tau.t, signature):
return false
var first: blst_p1
for qelem in q:
var prod: blst_p1
prod.blst_p1_mult(hashNameI(self.tau.t.name, qelem.i), qelem.v, 255)
first.blst_p1_add_or_double(first, prod)
doAssert(blst_p1_on_curve(first).bool)
let us = self.tau.t.u
var second: blst_p1
for j in 0..<len(us):
var prod: blst_p1
prod.blst_p1_mult(us[j], mus[j], 255)
second.blst_p1_add_or_double(second, prod)
doAssert(blst_p1_on_curve(second).bool)
var sum: blst_p1
sum.blst_p1_add_or_double(first, second)
var g : blst_p2
g.blst_p2_from_affine(BLS12_381_G2)
return verifyPairings(sum, self.spk.key, sigma, g)
proc init*(
T: type PoR,
stream: SeekableStream,
ssk: SecretKey,
spk: PublicKey,
blockSize: int64
): Future[PoR] {.async.} =
## Set up the POR scheme by generating tags and metadata
##
doAssert(
(blockSize mod BytesPerSector) == 0,
"Block size should be divisible by `BytesPerSector`")
let
s = blockSize div BytesPerSector
n = stream.sectorsCount(s)
# generate a random name
var t = TauZero(n: n)
for i in 0..<Namelen:
t.name[i] = byte Rng.instance.rand(0xFF)
# generate the coefficient vector for combining sectors of a block: U
var ubase: seq[blst_scalar]
for i in 0..<s:
let (u, ub) = rndP1()
t.u.add(u)
ubase.add(ub)
#TODO: a better bytearray conversion of TauZero for the signature might be needed
# the current conversion using $t might be architecture dependent and not unique
let
signature = sign(ssk.signkey, $t)
tau = Tau(t: t, signature: signature.exportRaw())
# generate sigmas
var
sigmas: seq[blst_p1]
for i in 0..<n:
sigmas.add((await stream.generateAuthenticator(ssk, i, s, t, ubase)))
return PoR(
ssk: ssk,
spk: spk,
tau: tau,
authenticators: sigmas)

View File

@ -1,3 +0,0 @@
import ./serialization/serialization
export serialization

View File

@ -1,185 +0,0 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/questionable/results
import pkg/libp2p/protobuf/minprotobuf
type
TauZeroMessage* = object
name*: seq[byte]
n*: int64
u*: seq[seq[byte]]
TauMessage* = object
t*: TauZeroMessage
signature*: seq[byte]
PubKeyMessage* = object
signkey*: seq[byte]
key*: seq[byte]
PorMessage* = object
tau*: TauMessage
spk*: PubKeyMessage
authenticators*: seq[seq[byte]]
ProofMessage* = object
mu*: seq[seq[byte]]
sigma*: seq[byte]
PoREnvelope* = object
por*: PorMessage
proof*: ProofMessage
func write*(pb: var ProtoBuffer, field: int, value: TauZeroMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.name)
ipb.write(2, value.n.uint64)
for u in value.u:
ipb.write(3, u)
ipb.finish()
pb.write(field, ipb)
func write*(pb: var ProtoBuffer, field: int, value: TauMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.t)
ipb.write(2, value.signature)
ipb.finish()
pb.write(field, ipb)
func write*(pb: var ProtoBuffer, field: int, value: PubKeyMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.signkey)
ipb.write(2, value.key)
ipb.finish()
pb.write(field, ipb)
func write*(pb: var ProtoBuffer, field: int, value: PorMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.tau)
ipb.write(2, value.spk)
for a in value.authenticators:
ipb.write(3, a)
ipb.finish()
pb.write(field, ipb)
func encode*(msg: PorMessage): seq[byte] =
var ipb = initProtoBuffer()
ipb.write(1, msg.tau)
ipb.write(2, msg.spk)
for a in msg.authenticators:
ipb.write(3, a)
ipb.finish
ipb.buffer
func write*(pb: var ProtoBuffer, field: int, value: ProofMessage) =
var ipb = initProtoBuffer()
for mu in value.mu:
ipb.write(1, mu)
ipb.write(2, value.sigma)
ipb.finish()
pb.write(field, ipb)
func encode*(message: PoREnvelope): seq[byte] =
var ipb = initProtoBuffer()
ipb.write(1, message.por)
ipb.write(2, message.proof)
ipb.finish
ipb.buffer
proc decode*(_: type TauZeroMessage, pb: ProtoBuffer): ProtoResult[TauZeroMessage] =
var
value = TauZeroMessage()
discard ? pb.getField(1, value.name)
var val: uint64
discard ? pb.getField(2, val)
value.n = val.int64
var bytes: seq[seq[byte]]
discard ? pb.getRepeatedField(3, bytes)
for b in bytes:
value.u.add(b)
ok(value)
proc decode*(_: type TauMessage, pb: ProtoBuffer): ProtoResult[TauMessage] =
var
value = TauMessage()
ipb: ProtoBuffer
discard ? pb.getField(1, ipb)
value.t = ? TauZeroMessage.decode(ipb)
discard ? pb.getField(2, value.signature)
ok(value)
proc decode*(_: type PubKeyMessage, pb: ProtoBuffer): ProtoResult[PubKeyMessage] =
var
value = PubKeyMessage()
discard ? pb.getField(1, value.signkey)
discard ? pb.getField(2, value.key)
ok(value)
proc decode*(_: type PorMessage, pb: ProtoBuffer): ProtoResult[PorMessage] =
var
value = PorMessage()
ipb: ProtoBuffer
discard ? pb.getField(1, ipb)
value.tau = ? TauMessage.decode(ipb)
discard ? pb.getField(2, ipb)
value.spk = ? PubKeyMessage.decode(ipb)
var
bytes: seq[seq[byte]]
discard ? pb.getRepeatedField(3, bytes)
for b in bytes:
value.authenticators.add(b)
ok(value)
proc decode*(_: type PorMessage, msg: seq[byte]): ProtoResult[PorMessage] =
PorMessage.decode(initProtoBuffer(msg))
proc decode*(_: type ProofMessage, pb: ProtoBuffer): ProtoResult[ProofMessage] =
var
value = ProofMessage()
discard ? pb.getField(1, value.mu)
discard ? pb.getField(2, value.sigma)
ok(value)
func decode*(_: type PoREnvelope, msg: openArray[byte]): ?!PoREnvelope =
var
value = PoREnvelope()
pb = initProtoBuffer(msg)
discard ? pb.getField(1, ? value.por.decode)
discard ? pb.getField(2, ? value.proof.decode)
ok(value)

View File

@ -1,166 +0,0 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/stew/results
import pkg/stew/objects
import pkg/blscurve
import pkg/blscurve/blst/blst_abi
import ./messages
export messages
import ../por
func toMessage*(self: Proof): ProofMessage =
var
message = ProofMessage()
sigma: array[96, byte]
for mu in self.mu:
var
serialized: array[32, byte]
blst_bendian_from_scalar(serialized, mu)
message.mu.add(toSeq(serialized))
blst_p1_serialize(sigma, self.sigma)
message.sigma = toSeq(sigma)
message
func fromMessage*(self: ProofMessage): Result[Proof, string] =
var
proof = Proof()
sigmaAffine: blst_p1_affine
if blst_p1_deserialize(sigmaAffine, toArray(96, self.sigma)) != BLST_SUCCESS:
return err("Unable to decompress sigma")
blst_p1_from_affine(proof.sigma, sigmaAffine)
for mu in self.mu:
var
muScalar: blst_scalar
blst_scalar_from_bendian(muScalar, toArray(32, mu))
proof.mu.add(muScalar)
ok(proof)
func toMessage*(self: TauZero): TauZeroMessage =
var
message = TauZeroMessage(
name: toSeq(self.name),
n: self.n)
for u in self.u:
var
serialized: array[96, byte]
# serialized and compresses the points
blst_p1_serialize(serialized, u)
message.u.add(toSeq(serialized))
message
func fromMessage*(self: TauZeroMessage): Result[TauZero, string] =
var
tauZero: TauZero
tauZero.name = toArray(512, self.name)
tauZero.n = self.n
for u in self.u:
var
uuAffine: blst_p1_affine
uu: blst_p1
if blst_p1_deserialize(uuAffine, toArray(96, u)) != BLST_SUCCESS:
return err("Unable to decompress u")
blst_p1_from_affine(uu, uuAffine)
tauZero.u.add(uu)
ok(tauZero)
func toMessage*(self: Tau): TauMessage =
TauMessage(
t: self.t.toMessage(),
signature: toSeq(self.signature)) # signature is already in serialized form
func fromMessage*(self: TauMessage): Result[Tau, string] =
var
message = Tau(
t: ? self.t.fromMessage(),
signature: toArray(96, self.signature))
ok(message)
func toMessage*(self: por.PublicKey): PubKeyMessage =
var
signkey = toSeq(self.signkey.exportUncompressed())
message = PubKeyMessage(signkey: signkey)
key: array[192, byte]
blst_p2_serialize(key, self.key)
message.key = toSeq(key)
message
func fromMessage*(self: PubKeyMessage): Result[por.PublicKey, string] =
var
spk: por.PublicKey
keyAffine: blst_p2_affine
if not spk.signkey.fromBytes(self.signkey.toOpenArray(0, 95)):
return err("Unable to deserialize public key!")
if blst_p2_deserialize(keyAffine, toArray(192, self.key)) != BLST_SUCCESS:
return err("Unable to decompress key!")
blst_p2_from_affine(spk.key, keyAffine)
ok(spk)
func toMessage*(self: PoR): PorMessage =
var
message = PorMessage(
tau: self.tau.toMessage(),
spk: self.spk.toMessage())
for sigma in self.authenticators:
var
serialized: array[96, byte]
blst_p1_serialize(serialized, sigma)
message.authenticators.add(toSeq(serialized))
message
func fromMessage*(self: PorMessage): Result[PoR, string] =
var
por = PoR(
tau: ? self.tau.fromMessage(),
spk: ? self.spk.fromMessage())
for sigma in self.authenticators:
var
sigmaAffine: blst_p1_affine
authenticator: blst_p1
if blst_p1_deserialize(sigmaAffine, toArray(96, sigma)) != BLST_SUCCESS:
return err("Unable to decompress sigma")
blst_p1_from_affine(authenticator, sigmaAffine)
por.authenticators.add(authenticator)
return ok(por)

View File

@ -1,100 +0,0 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import pkg/contractabi/address as ca
import ../stores
import ../manifest
import ../streams
import ../utils
import ./por
import ./stpnetwork
import ./stpproto
import ./stpstore
export stpnetwork, stpstore, por, stpproto
type
StorageProofs* = object
store*: BlockStore
network*: StpNetwork
stpStore*: StpStore
proc upload*(
self: StorageProofs,
cid: Cid,
indexes: seq[int],
host: ca.Address
): Future[?!void] {.async.} =
## Upload authenticators
##
without por =? (await self.stpStore.retrieve(cid)):
trace "Unable to retrieve por data from store", cid
return failure("Unable to retrieve por data from store")
return await self.network.uploadTags(
cid,
indexes,
por.authenticators,
host)
# proc proof*() =
# discard
# proc verify*() =
# discard
proc setupProofs*(
self: StorageProofs,
manifest: Manifest
): Future[?!void] {.async.} =
## Setup storage authentication
##
without cid =? manifest.cid:
return failure("Unable to retrieve Cid from manifest!")
let
(spk, ssk) = keyGen()
por = await PoR.init(
StoreStream.new(self.store, manifest),
ssk,
spk,
manifest.blockSize)
return await self.stpStore.store(por.toMessage(), cid)
proc init*(
T: type StorageProofs,
network: StpNetwork,
store: BlockStore,
stpStore: StpStore
): StorageProofs =
var
self = T(
store: store,
stpStore: stpStore,
network: network)
proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} =
try:
await self.stpStore.store(msg.cid, msg.tags).tryGet()
trace "Stored tags", cid = $msg.cid, tags = msg.tags.len
except CatchableError as exc:
trace "Exception attempting to store tags", exc = exc.msg
self.network.tagsHandler = tagsHandler
self

View File

@ -1,106 +0,0 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/chronicles
import pkg/questionable/results
import pkg/contractabi/address as ca
import ./stpproto
import ../discovery
import ../formats
const
Codec* = "/dagger/storageproofs/1.0.0"
MaxMessageSize* = 1 shl 22 # 4MB
logScope:
topics = "dagger storageproofs network"
type
TagsHandler* = proc(msg: TagsMessage):
Future[void] {.raises: [Defect], gcsafe.}
StpNetwork* = ref object of LPProtocol
switch*: Switch
discovery*: Discovery
tagsHandle*: TagsHandler
proc uploadTags*(
self: StpNetwork,
cid: Cid,
indexes: seq[int],
tags: seq[seq[byte]],
host: ca.Address
): Future[?!void] {.async.} =
# Upload tags to `host`
#
var msg = TagsMessage(cid: cid.data.buffer)
for i in indexes:
msg.tags.add(Tag(idx: i, tag: tags[i]))
let
peers = await self.discovery.find(host)
connFut = await one(peers.mapIt(
self.switch.dial(
it.data.peerId,
it.data.addresses.mapIt( it.address ),
@[Codec])))
conn = await connFut
try:
await conn.writeLp(msg.encode)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Exception submitting tags", cid, exc = exc.msg
return failure(exc.msg)
finally:
await conn.close()
return success()
method init*(self: StpNetwork) =
## Perform protocol initialization
##
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let
msg = await conn.readLp(MaxMessageSize)
res = TagsMessage.decode(msg)
if not self.tagsHandle.isNil:
if res.isOk and res.get.tags.len > 0:
await self.tagsHandle(res.get)
except CatchableError as exc:
trace "Exception handling Storage Proofs message", exc = exc.msg
finally:
await conn.close()
self.handler = handle
self.codec = Codec
proc new*(
T: type StpNetwork,
switch: Switch,
discovery: Discovery
): StpNetwork =
## create a new StpNetwork instance
let
self = StpNetwork(
switch: switch,
discovery: discovery)
self.init()
self

View File

@ -1,3 +0,0 @@
import ./stpproto/messages
export messages

View File

@ -1,66 +0,0 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/questionable/results
import pkg/libp2p/protobuf/minprotobuf
type
Tag* = object
idx*: int64
tag*: seq[byte]
TagsMessage* = object
cid*: seq[byte]
tags*: seq[Tag]
func write*(pb: var ProtoBuffer, field: int, value: Tag) =
var ipb = initProtoBuffer()
ipb.write(1, value.idx.uint64)
ipb.write(2, value.tag)
ipb.finish()
pb.write(field, ipb)
func encode*(msg: TagsMessage): seq[byte] =
var ipb = initProtoBuffer()
ipb.write(1, msg.cid)
for tag in msg.tags:
ipb.write(2, tag)
ipb.finish()
ipb.buffer
func decode*(_: type Tag, pb: ProtoBuffer): ProtoResult[Tag] =
var
value = Tag()
idx: uint64
discard ? pb.getField(1, idx)
value.idx = idx.int64
discard ? pb.getField(2, value.tag)
ok(value)
func decode*(_: type TagsMessage, msg: openArray[byte]): ProtoResult[TagsMessage] =
var
value = TagsMessage()
pb = initProtoBuffer(msg)
discard ? pb.getField(1, value.cid)
var
bytes: seq[seq[byte]]
discard ? pb.getRepeatedField(2, bytes)
for b in bytes:
value.tags.add(? Tag.decode(initProtoBuffer(b)))
ok(value)

View File

@ -1,130 +0,0 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/os
import std/strformat
import pkg/libp2p
import pkg/chronos
import pkg/chronicles
import pkg/stew/io2
import pkg/questionable
import pkg/questionable/results
import ../errors
import ../formats
import ./stpproto
import ./por
type
StpStore* = object
authDir*: string
postfixLen*: int
template stpPath*(self: StpStore, cid: Cid): string =
self.authDir / ($cid)[^self.postfixLen..^1] / $cid
proc retrieve*(
self: StpStore,
cid: Cid
): Future[?!PorMessage] {.async.} =
## Retrieve authenticators from data store
##
let path = self.stpPath(cid) / "por"
var data: seq[byte]
if (
let res = io2.readFile(path, data);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Cannot retrieve storage proof data from fs", path , error
return failure("Cannot retrieve storage proof data from fs")
return PorMessage.decode(data).mapFailure(CatchableError)
proc store*(
self: StpStore,
por: PorMessage,
cid: Cid
): Future[?!void] {.async.} =
## Persist storage proofs
##
let
dir = self.stpPath(cid)
if io2.createPath(dir).isErr:
trace "Unable to create storage proofs prefix dir", dir
return failure(&"Unable to create storage proofs prefix dir ${dir}")
let path = dir / "por"
if (
let res = io2.writeFile(path, por.encode());
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Unable to store storage proofs", path, cid, error
return failure(
&"Unable to store storage proofs - path = ${path} cid = ${cid} error = ${error}")
return success()
proc retrieve*(
self: StpStore,
cid: Cid,
blocks: seq[int]
): Future[?!seq[Tag]] {.async.} =
var tags: seq[Tag]
for b in blocks:
var tag = Tag(idx: b)
let path = self.stpPath(cid) / $b
if (
let res = io2.readFile(path, tag.tag);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Cannot retrieve tags from fs", path , error
return failure("Cannot retrieve tags from fs")
tags.add(tag)
return tags.success
proc store*(
self: StpStore,
tags: seq[Tag],
cid: Cid
): Future[?!void] {.async.} =
let
dir = self.stpPath(cid)
if io2.createPath(dir).isErr:
trace "Unable to create storage proofs prefix dir", dir
return failure(&"Unable to create storage proofs prefix dir ${dir}")
for t in tags:
let path = dir / $t.idx
if (
let res = io2.writeFile(path, t.tag);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Unable to store tags", path, cid, error
return failure(
&"Unable to store tags - path = ${path} cid = ${cid} error = ${error}")
return success()
proc init*(
T: type StpStore,
authDir: string,
postfixLen: int = 2
): StpStore =
## Init StpStore
##
StpStore(
authDir: authDir,
postfixLen: postfixLen)

View File

@ -35,9 +35,9 @@ export blocktype, cid
logScope:
topics = "codex repostore"
declareGauge(codexRepostoreBlocks, "codex repostore blocks")
declareGauge(codexRepostoreBytesUsed, "codex repostore bytes used")
declareGauge(codexRepostoreBytesReserved, "codex repostore bytes reserved")
declareGauge(codex_repostore_blocks, "codex repostore blocks")
declareGauge(codex_repostore_bytes_used, "codex repostore bytes used")
declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved")
const
DefaultBlockTtl* = 24.hours
@ -64,9 +64,9 @@ type
expiration*: SecondsSince1970
proc updateMetrics(self: RepoStore) =
codexRepostoreBlocks.set(self.totalBlocks.int64)
codexRepostoreBytesUsed.set(self.quotaUsedBytes.int64)
codexRepostoreBytesReserved.set(self.quotaReservedBytes.int64)
codex_repostore_blocks.set(self.totalBlocks.int64)
codex_repostore_bytes_used.set(self.quotaUsedBytes.int64)
codex_repostore_bytes_reserved.set(self.quotaReservedBytes.int64)
func totalUsed*(self: RepoStore): uint =
(self.quotaUsedBytes + self.quotaReservedBytes)

View File

@ -296,11 +296,14 @@ asyncchecksuite "Sales":
let blk = bt.Block.new( @[1.byte] ).get
onBatch(@[ blk ])
return success()
let sold = newFuture[void]()
sales.onSale = proc(request: StorageRequest, slotIndex: UInt256) =
sold.complete()
createAvailability()
let origSize = availability.size
await market.requestStorage(request)
await sleepAsync(2.millis) # allow proving to start
await sold # allow proving to start
# complete request
market.slotState[request.slotId(slotIndex)] = SlotState.Finished
@ -450,10 +453,35 @@ asyncchecksuite "Sales":
return success()
createAvailability()
await market.requestStorage(request)
clock.set(request.expiry.truncate(int64))
# If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()`
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
market.requestState[request.id]=RequestState.Cancelled
clock.set(request.expiry.truncate(int64)+1)
check eventually (await reservations.all(Availability)).get == @[availability]
check getAvailability().size == origSize
test "verifies that request is indeed expired from onchain before firing onCancelled":
let origSize = availability.size
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
createAvailability()
await market.requestStorage(request)
market.requestState[request.id]=RequestState.New # "On-chain" is the request still ongoing even after local expiration
# If we would not await, then the `clock.set` would run "too fast" as the `subscribeCancellation()`
# would otherwise not set the timeout early enough as it uses `clock.now` in the deadline calculation.
await sleepAsync(chronos.milliseconds(100))
clock.set(request.expiry.truncate(int64)+1)
check getAvailability().size == 0
market.requestState[request.id]=RequestState.Cancelled # Now "on-chain" is also expired
check eventually getAvailability().size == origSize
test "loads active slots from market":
let me = await market.getSigner()

View File

@ -1,112 +0,0 @@
import std/sequtils
import pkg/asynctest
import pkg/chronos
import pkg/libp2p/errors
import pkg/contractabi as ca
import pkg/codex/rng
import pkg/codex/chunker
import pkg/codex/storageproofs
import pkg/codex/discovery
import pkg/codex/manifest
import pkg/codex/merkletree
import pkg/codex/stores
import pkg/codex/storageproofs as st
import pkg/codex/blocktype as bt
import pkg/codex/streams
import ../examples
import ../helpers
const
BlockSize = 31'nb * 64
DataSetSize = BlockSize * 100
CacheSize = DataSetSize * 2
asyncchecksuite "Storage Proofs Network":
let
hostAddr = ca.Address.example
blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random
var
stpNetwork1: StpNetwork
stpNetwork2: StpNetwork
switch1: Switch
switch2: Switch
discovery1: MockDiscovery
discovery2: MockDiscovery
chunker: RandomChunker
manifest: Manifest
store: BlockStore
ssk: st.SecretKey
spk: st.PublicKey
porMsg: PorMessage
cid: Cid
porStream: StoreStream
por: PoR
tags: seq[Tag]
setup:
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
store = CacheStore.new(cacheSize = CacheSize, chunkSize = BlockSize)
(spk, ssk) = st.keyGen()
manifest = await storeDataGetManifest(store, chunker)
cid = manifest.cid.tryGet()
porStream = StoreStream.new(store, manifest)
por = await PoR.init(
porStream,
ssk, spk,
BlockSize.int)
porMsg = por.toMessage()
tags = blocks.mapIt(
Tag(idx: it, tag: porMsg.authenticators[it]))
switch1 = newStandardSwitch()
switch2 = newStandardSwitch()
discovery1 = MockDiscovery.new()
discovery2 = MockDiscovery.new()
stpNetwork1 = StpNetwork.new(switch1, discovery1)
stpNetwork2 = StpNetwork.new(switch2, discovery2)
switch1.mount(stpNetwork1)
switch2.mount(stpNetwork2)
await switch1.start()
await switch2.start()
teardown:
await switch1.stop()
await switch2.stop()
await close(porStream)
test "Should upload to host":
var
done = newFuture[void]()
discovery1.findHostProvidersHandler = proc(d: MockDiscovery, host: ca.Address):
Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
check hostAddr == host
return @[switch2.peerInfo.signedPeerRecord]
proc tagsHandler(msg: TagsMessage) {.async, gcsafe.} =
check:
Cid.init(msg.cid).tryGet() == cid
msg.tags == tags
done.complete()
stpNetwork2.tagsHandle = tagsHandler
(await stpNetwork1.uploadTags(
cid,
blocks,
porMsg.authenticators,
hostAddr)).tryGet()
await done.wait(1.seconds)

View File

@ -1,162 +0,0 @@
import pkg/chronos
import pkg/asynctest
import pkg/blscurve/blst/blst_abi
import pkg/codex/streams
import pkg/codex/storageproofs as st
import pkg/codex/stores
import pkg/codex/manifest
import pkg/codex/merkletree
import pkg/codex/chunker
import pkg/codex/rng
import pkg/codex/blocktype as bt
import ../helpers
const
BlockSize = 31'nb * 4
SectorSize = 31'nb
SectorsPerBlock = BlockSize div SectorSize
DataSetSize = BlockSize * 100
CacheSize = DataSetSize * 2
asyncchecksuite "BLS PoR":
var
chunker: RandomChunker
manifest: Manifest
store: BlockStore
ssk: st.SecretKey
spk: st.PublicKey
porStream: SeekableStream
proofStream: SeekableStream
setup:
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
store = CacheStore.new(cacheSize = CacheSize, chunkSize = BlockSize)
(spk, ssk) = st.keyGen()
manifest = await storeDataGetManifest(store, chunker)
porStream = StoreStream.new(store, manifest)
proofStream = StoreStream.new(store, manifest)
teardown:
await close(porStream)
await close(proofStream)
proc createPor(): Future[PoR] =
return PoR.init(
porStream,
ssk,
spk,
BlockSize.int)
proc createProof(por: PoR, q: seq[QElement]): Future[Proof] =
return generateProof(
proofStream,
q,
por.authenticators,
SectorsPerBlock)
test "Test PoR without corruption":
let
por = await createPor()
q = generateQuery(por.tau, 22)
proof = await createProof(por, q)
check por.verifyProof(q, proof.mu, proof.sigma)
test "Test PoR with corruption - query: 22, corrupted blocks: 300, bytes: 10":
let
por = await createPor()
pos = await store.corruptBlocks(manifest, 30, 10)
q = generateQuery(por.tau, 22)
proof = await createProof(por, q)
check pos.len == 30
check not por.verifyProof(q, proof.mu, proof.sigma)
asyncchecksuite "Test Serialization":
var
chunker: RandomChunker
manifest: Manifest
store: BlockStore
ssk: st.SecretKey
spk: st.PublicKey
por: PoR
q: seq[QElement]
proof: Proof
porStream: SeekableStream
proofStream: SeekableStream
setup:
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
store = CacheStore.new(cacheSize = CacheSize, chunkSize = BlockSize)
manifest = await storeDataGetManifest(store, chunker)
(spk, ssk) = st.keyGen()
porStream = StoreStream.new(store, manifest)
por = await PoR.init(
porStream,
ssk,
spk,
BlockSize.int)
q = generateQuery(por.tau, 22)
proofStream = StoreStream.new(store, manifest)
proof = await generateProof(
proofStream,
q,
por.authenticators,
SectorsPerBlock)
teardown:
await close(porStream)
await close(proofStream)
test "Serialize Public Key":
var
spkMessage = spk.toMessage()
check:
spk.signkey == spkMessage.fromMessage().tryGet().signkey
spk.key.blst_p2_is_equal(spkMessage.fromMessage().tryGet().key).bool
test "Serialize TauZero":
var
tauZeroMessage = por.tau.t.toMessage()
tauZero = tauZeroMessage.fromMessage().tryGet()
check:
por.tau.t.name == tauZero.name
por.tau.t.n == tauZero.n
for i in 0..<por.tau.t.u.len:
check blst_p1_is_equal(por.tau.t.u[i], tauZero.u[i]).bool
test "Serialize Tau":
var
tauMessage = por.tau.toMessage()
tau = tauMessage.fromMessage().tryGet()
check:
por.tau.signature == tau.signature
test "Serialize PoR":
let
porMessage = por.toMessage()
ppor = porMessage.fromMessage().tryGet()
for i in 0..<por.authenticators.len:
check blst_p1_is_equal(por.authenticators[i], ppor.authenticators[i]).bool
test "Serialize Proof":
let
proofMessage = proof.toMessage()
pproof = proofMessage.fromMessage().tryGet()
check:
proof.sigma.blst_p1_is_equal(pproof.sigma).bool
proof.mu == pproof.mu
check por.verifyProof(q, pproof.mu, pproof.sigma)

View File

@ -1,77 +0,0 @@
import std/os
import std/sequtils
import pkg/chronos
import pkg/asynctest
import pkg/codex/rng
import pkg/codex/streams
import pkg/codex/merkletree
import pkg/codex/storageproofs as st
import pkg/codex/blocktype as bt
import ../helpers
const
BlockSize = 31'nb * 64'nb
DataSetSize = BlockSize * 100
CacheSize = DataSetSize * 2
asyncchecksuite "Test PoR store":
let
blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random
var
chunker: RandomChunker
manifest: Manifest
store: BlockStore
ssk: st.SecretKey
spk: st.PublicKey
repoDir: string
stpstore: st.StpStore
porStream: SeekableStream
por: PoR
porMsg: PorMessage
cid: Cid
tags: seq[Tag]
setup:
chunker = RandomChunker.new(Rng.instance(), size = DataSetSize.int, chunkSize = BlockSize)
store = CacheStore.new(cacheSize = CacheSize * 2, chunkSize = BlockSize)
(spk, ssk) = st.keyGen()
manifest = await storeDataGetManifest(store, chunker)
cid = manifest.treeCid
porStream = StoreStream.new(store, manifest)
por = await PoR.init(
porStream,
ssk, spk,
BlockSize.int)
porMsg = por.toMessage()
tags = blocks.mapIt(
Tag(idx: it, tag: porMsg.authenticators[it]) )
repoDir = getAppDir() / "stp"
createDir(repoDir)
stpstore = st.StpStore.init(repoDir)
teardown:
await close(porStream)
removeDir(repoDir)
test "Should store Storage Proofs":
check (await stpstore.store(por.toMessage(), cid)).isOk
check fileExists(stpstore.stpPath(cid) / "por")
test "Should retrieve Storage Proofs":
discard await stpstore.store(por.toMessage(), cid)
check (await stpstore.retrieve(cid)).tryGet() == porMsg
test "Should store tags":
check (await stpstore.store(tags, cid)).isOk
for t in tags:
check fileExists(stpstore.stpPath(cid) / $t.idx )
test "Should retrieve tags":
discard await stpstore.store(tags, cid)
check (await stpstore.retrieve(cid, blocks)).tryGet() == tags

View File

@ -105,7 +105,7 @@ asyncchecksuite "Purchasing":
let purchase = await purchasing.purchase(request)
check eventually market.requested.len > 0
let request = market.requested[0]
clock.set(request.expiry.truncate(int64))
clock.set(request.expiry.truncate(int64) + 1)
expect PurchaseTimeout:
await purchase.wait()
@ -113,7 +113,7 @@ asyncchecksuite "Purchasing":
let purchase = await purchasing.purchase(request)
check eventually market.requested.len > 0
let request = market.requested[0]
clock.set(request.expiry.truncate(int64))
clock.set(request.expiry.truncate(int64) + 1)
expect PurchaseTimeout:
await purchase.wait()
check market.withdrawn == @[request.id]

View File

@ -1,5 +0,0 @@
import ./storageproofs/teststpstore
import ./storageproofs/testpor
import ./storageproofs/testnetwork
{.warning[UnusedImport]: off.}

View File

@ -1,4 +1,5 @@
import std/random
import std/strutils
import std/sequtils
import std/times
import std/typetraits
@ -6,6 +7,12 @@ import pkg/codex/contracts/requests
import pkg/codex/sales/slotqueue
import pkg/stint
proc exampleString*(length: int): string =
let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result = newString(length) # Create a new empty string with a given length
for i in 0..<length:
result[i] = chars[rand(chars.len-1)] # Generate a random index and set the string's character
proc example*[T: SomeInteger](_: type T): T =
rand(T)

View File

@ -64,10 +64,11 @@ proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase =
let json = ? parseJson(body).catch
RestPurchase.fromJson(json)
proc getSlots*(client: CodexClient): JsonNode =
proc getSlots*(client: CodexClient): ?!seq[Slot] =
let url = client.baseurl & "/sales/slots"
let body = client.http.getContent(url)
parseJson(body).catch |? nil
let json = ? parseJson(body).catch
seq[Slot].fromJson(json)
proc postAvailability*(
client: CodexClient,

View File

@ -86,6 +86,7 @@ template multinodesuite*(name: string,
"--api-port=" & $(8080 + index),
"--data-dir=" & datadir,
"--nat=127.0.0.1",
"--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--disc-ip=127.0.0.1",
"--disc-port=" & $(8090 + index),
"--eth-account=" & $accounts[index]]

View File

@ -1,4 +1,5 @@
import std/options
import std/sequtils
from pkg/libp2p import `==`
import pkg/chronos
import pkg/stint
@ -10,6 +11,7 @@ import pkg/codex/utils/stintutils
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ../examples
import ./twonodes
@ -164,3 +166,30 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
await provider.advanceTime(duration)
check eventually (await token.balanceOf(account2)) - startBalance == duration*reward
test "expired request partially pays out for stored time":
let marketplace = Marketplace.new(Marketplace.address, provider.getSigner())
let tokenAddress = await marketplace.token()
let token = Erc20Token.new(tokenAddress, provider.getSigner())
let reward = 400.u256
let duration = 100.u256
# client 2 makes storage available
let startBalanceClient2 = await token.balanceOf(account2)
discard client2.postAvailability(size=140000.u256, duration=200.u256, minPrice=300.u256, maxCollateral=300.u256).get
# client 1 requests storage but requires two nodes to host the content
let startBalanceClient1 = await token.balanceOf(account1)
let expiry = (await provider.currentTime()) + 10
let cid = client1.upload(exampleString(100000)).get
let id = client1.requestStorage(cid, duration=duration, reward=reward, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2).get
# We have to wait for Client 2 fills the slot, before advancing time.
# Until https://github.com/codex-storage/nim-codex/issues/594 is implemented nothing better then
# sleeping some seconds is available.
await sleepAsync(2.seconds)
await provider.advanceTimeTo(expiry+1)
check eventually(client1.purchaseStateIs(id, "cancelled"), 20000)
check eventually ((await token.balanceOf(account2)) - startBalanceClient2) > 0 and ((await token.balanceOf(account2)) - startBalanceClient2) < 10*reward
check eventually (startBalanceClient1 - (await token.balanceOf(account1))) == ((await token.balanceOf(account2)) - startBalanceClient2)

View File

@ -26,6 +26,7 @@ ethersuite "Node block expiration tests":
"--api-port=8080",
"--data-dir=" & dataDir,
"--nat=127.0.0.1",
"--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--disc-ip=127.0.0.1",
"--disc-port=8090",
"--block-ttl=" & $blockTtlSeconds,

View File

@ -63,6 +63,7 @@ twonodessuite "Proving integration test", debug1=false, debug2=false:
"--data-dir=" & validatorDir,
"--api-port=8089",
"--disc-port=8099",
"--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--validator",
"--eth-account=" & $accounts[2]
], debug = false

View File

@ -38,6 +38,7 @@ template twonodessuite*(name: string, debug1, debug2: string, body) =
"--nat=127.0.0.1",
"--disc-ip=127.0.0.1",
"--disc-port=8090",
"--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--persistence",
"--eth-account=" & $account1
]
@ -56,6 +57,7 @@ template twonodessuite*(name: string, debug1, debug2: string, body) =
"--nat=127.0.0.1",
"--disc-ip=127.0.0.1",
"--disc-port=8091",
"--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--bootstrap-node=" & bootstrap,
"--persistence",
"--eth-account=" & $account2

View File

@ -1,6 +1,5 @@
import ./codex/teststores
import ./codex/testblockexchange
import ./codex/teststorageproofs
import ./codex/testasyncheapqueue
import ./codex/testchunking
import ./codex/testmanifest

@ -1 +1 @@
Subproject commit 1854dfba9991a25532de5f6a53cf50e66afb3c8b
Subproject commit 14e453ac3150e6c9ca277e605d5df9389ac7eea7