includes recent documents and updates

This commit is contained in:
Marcin Czenko 2025-05-02 10:42:20 +02:00
parent 9449bc1162
commit 6017fe440d
No known key found for this signature in database
GPG Key ID: 33DEA0C8E30937C0
6 changed files with 1813 additions and 0 deletions

View File

@ -0,0 +1,761 @@
For a seasoned Nim developer a lot of things I am writing here may be obvious, but for those in a continuous learning path, it may bring some consolation.
The [The Status Nim style guide](https://status-im.github.io/nim-style-guide) recommends [explicit error handling mechanisms](https://status-im.github.io/nim-style-guide/errors.result.html) and handling errors locally at each abstraction level, avoiding spurious abstraction leakage. This is in contrast to leaking exception types between layers and translating exceptions that causes high visual overhead, specially when hierarchy is used, often becoming not practical, loosing all advantages of using exceptions in the first place (read more [here](https://status-im.github.io/nim-style-guide/errors.exceptions.html)).
Handling error and working with exceptions is easier to grasp when not using asynchronous code. But when you start, there are some subtle traps you may be falling into.
This short note focuses on asynchronous code. It is not complete, but pragmatic, it has gaps, but provides directions if one wants to research further.
In our code we often use the following patterns:
1. using [nim-results](https://github.com/arnetheduck/nim-results) and [std/options](https://nim-lang.org/docs/options.html) to communicate the results and failures to the caller,
2. *async* operations are annotated with `{.async: (raises: [CancelledError]).}`.
Some interesting things are happening when you annotate a Nim `proc` with "async raises".
Let's looks at some examples.
Imagine you have the following type definitions:
```nim
type
MyError = object of CatchableError
Handle1 = Future[void].Raising([CancelledError])
Handle2 = Future[void].Raising([CancelledError, MyError])
SomeType = object
name: string
handle1: Handle1
handle2: Handle2
```
`Handle1` and `Handle2` are *raising exceptions*. By using `Rasing` macro, passing the list of allowed exceptions coming out from the future as an argument, `Handle1` and `Handle2` are no longer well-known `Future[void]`, but rather a descendant of it:
```nim
type
InternalRaisesFuture*[T, E] = ref object of Future[T]
## Future with a tuple of possible exception types
## eg InternalRaisesFuture[void, (ValueError, OSError)]
##
## This type gets injected by `async: (raises: ...)` and similar utilities
## and should not be used manually as the internal exception representation
## is subject to change in future chronos versions.
# TODO https://github.com/nim-lang/Nim/issues/23418
# TODO https://github.com/nim-lang/Nim/issues/23419
when E is void:
dummy: E
else:
dummy: array[0, E]
```
The comment is saying something important: if you annotate a `proc` with `async: (raises: ...)`, you are changing the type being returned by the `proc`. To see what does it mean, lets start with something easy. Let's write a constructor for `SomeType`:
```nim
proc newSomeType(name: string): SomeType =
let t = SomeType(
name: name,
# both fail
handle1: newFuture[void](),
handle2: newFuture[void](),
)
t
```
Well, this will not compile. `handle1` expects `InternalRaisesFuture[system.void, (CancelledError,)]`, but instead it gets `Future[system.void]`. Yes, we are trying to cast a more generic to a less generic type. This is because `newSomeType` is not annotated with `async: (raises: ...)` and therefore every time you use `newFuture` inside it, `newFuture` returns regular `Future[void]`.
So, the first time I encountered this problem I went into a rabbit hole of understanding how to create *raising* futures by solely relaying on `async: (raises: ...)` pragma. But there is actually a public (I guess) interface allowing us to create a raising future without relaying on `async: (raises: ...)` annotation:
```
```nim
proc newSomeType(name: string): SomeType =
let t = SomeType(
name: name,
# both fail
handle1: Future[void].Raising([CancelledError]).init(),
handle2: Future[void].Raising([CancelledError, MyError]).init(),
)
t
```
A bit verbose, but perfectly fine otherwise, and it works as expected:
```nim
let someTypeInstance = newSomeType("test")
echo typeof(someTypeInstance.handle1) # outputs "Handle1"
echo typeof(someTypeInstance.handle2) # outputs "Handle2"
```
`init` has the following definition:
```nim
template init*[T, E](
F: type InternalRaisesFuture[T, E], fromProc: static[string] = ""): F =
## Creates a new pending future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
when not hasException(type(E), "CancelledError"):
static:
raiseAssert "Manually created futures must either own cancellation schedule or raise CancelledError"
let res = F()
internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending, {})
res
```
and is very similar to:
```nim
proc newInternalRaisesFutureImpl[T, E](
loc: ptr SrcLoc, flags: FutureFlags): InternalRaisesFuture[T, E] =
let fut = InternalRaisesFuture[T, E]()
internalInitFutureBase(fut, loc, FutureState.Pending, flags)
fut
```
thus, if we had exposed the internals, our example would be:
```nim
proc newSomeType(name: string): SomeType =
let t = SomeType(
name: name,
handle1: newInternalRaisesFuture[void, (CancelledError,)](),
handle2: newInternalRaisesFuture[void, (CancelledError, MyError)](),
)
t
```
It is still very educational to study the chronos source code to undertstand how does the `newFuture` know which type to return: `Future[T]` or `InternalRaisesFuture[T, E]` when a proc is annotated with `async` or `async: (raises: ...)`.
If you study `chronos/internal/asyncfutures.nim` you will see that `newFuture` is implemented with the following template:
```nim
template newFuture*[T](fromProc: static[string] = "",
flags: static[FutureFlags] = {}): auto =
## Creates a new future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
when declared(InternalRaisesFutureRaises): # injected by `asyncraises`
newInternalRaisesFutureImpl[T, InternalRaisesFutureRaises](
getSrcLocation(fromProc), flags)
else:
newFutureImpl[T](getSrcLocation(fromProc), flags)
```
We see the the actual implementation depends on the existence of `InternalRaisesFutureRaises`. Let's see how it is being setup...
The `async` pragma is a macro defined in `chronos/internal/asyncmacro.nim`:
```nim
macro async*(params, prc: untyped): untyped =
## Macro which processes async procedures into the appropriate
## iterators and yield statements.
if prc.kind == nnkStmtList:
result = newStmtList()
for oneProc in prc:
result.add asyncSingleProc(oneProc, params)
else:
result = asyncSingleProc(prc, params)
```
The `asyncSingleProc` is where a lot of things happen. This is where the errors *The raises pragma doesn't work on async procedures* or *Expected return type of 'Future' got ...* come from. The place where the return type is determined is interesting:
```nim
let baseType =
if returnType.kind == nnkEmpty:
ident "void"
elif not (
returnType.kind == nnkBracketExpr and
(eqIdent(returnType[0], "Future") or eqIdent(returnType[0], "InternalRaisesFuture"))):
error(
"Expected return type of 'Future' got '" & repr(returnType) & "'", prc)
return
else:
returnType[1]
```
An async proc can have two (explicit) return types: `Future[baseType]` or `InternalRaisesFuture[baseType]`. If no return type is specified for an async proc, the return base type is concluded to be `void`. Now the crucial part: the internal return type (we are still inside of `asyncSingleProc`):
```nim
baseTypeIsVoid = baseType.eqIdent("void")
(raw, raises, handleException) = decodeParams(params)
internalFutureType =
if baseTypeIsVoid:
newNimNode(nnkBracketExpr, prc).
add(newIdentNode("Future")).
add(baseType)
else:
returnType
internalReturnType = if raises == nil:
internalFutureType
else:
nnkBracketExpr.newTree(
newIdentNode("InternalRaisesFuture"),
baseType,
raises
)
```
To focus on the most important part, at the end we see that if `raises` attribute is present and set (`async: (raises: [])` means it does not raise, but the attribute is still present and detected), the `internalReturnType` will be set to:
```nim
nnkBracketExpr.newTree(
newIdentNode("InternalRaisesFuture"),
baseType,
raises
)
```
Thus, for `async: (raises: [CancelledError, ValueError])`, the return type will be `InternalRaisesFuture[baseType, (CancelledError, ValueError,)`.
If the `async` has `raw: true` param set, e.g. `async: (raw: true, raises: [CancelledError, ValueError])`, then `prc.body` gets prepended with the type definition we already recognize from `newFuture` above: `InternalRaisesFutureRaises`
```nim
if raw: # raw async = body is left as-is
if raises != nil and prc.kind notin {nnkProcTy, nnkLambda} and not isEmpty(prc.body):
# Inject `raises` type marker that causes `newFuture` to return a raise-
# tracking future instead of an ordinary future:
#
# type InternalRaisesFutureRaises = `raisesTuple`
# `body`
prc.body = nnkStmtList.newTree(
nnkTypeSection.newTree(
nnkTypeDef.newTree(
nnkPragmaExpr.newTree(
ident"InternalRaisesFutureRaises",
nnkPragma.newTree(ident "used")),
newEmptyNode(),
raises,
)
),
prc.body
)
```
For our example of `async: (raw: true, raises: [CancelledError, ValueError])`, this will be:
```nim
type InternalRaisesFutureRaises {.used.} = (CancelledError, ValueError,)
```
This allows the `newFuture` template to recognize it has to use `InternalRaisesFuture` as the return type.
### Experimenting with *Raising Futures*
With the `Future[...].Raising(...).init()` construct we can quite elegantly create new raising futures in regular proc not annotated with `async: (raises: ...)`. But to get more intuition, let's play a bit with creating our own version of `Future[...].Raising(...).init()` that will be built on top of `async: (raises: ...)` pragma.
> [!info]
> This is just an exercise. It reveals some interesting details about how `async` is implemented. I also used it to learn some basics about using macros and how they can help where generics have limitations.
Let's start with creating a proc that returns type `Handle1`?
Recall that `Handle1` is defined as follows:
```nim
type
Handle1 = Future[void].Raising([CancelledError])
```
```nim
proc newHandle1(): Handle1 {.async: (raw: true, [CancelledError]).} =
newFuture[void]()
proc newSomeType(name: string): SomeType =
let t = SomeType(
name: name,
handle1: newHandle1(),
handle2: Future[void].Raising([CancelledError, MyError]).init(),
)
t
```
That would be nice an concise, yet, you remember now the "Expected return type of 'Future' got ..." error from `asyncSingleProc`, right? This is what we will get:
```bash
Error: Expected return type of 'Future' got 'Handle1'
```
Knowing the implementation of the `asyncSingleProc` macro, we know that `InternalRaisesFuture[void, (CancelledError,)]` would work just fine as the return type:
```nim
proc newHandle1(): InternalRaisesFuture[void, (CancelledError,)] {.async: (raw: true, raises: [CancelledError]).} =
newFuture[void]()
```
but not:
```nim
proc newHandle1(): Future[void].Raises(CancelledError) {.async: (raw: true, raises: [CancelledError]).} =
newFuture[void]()
```
Thus we have to stick to `Future` as the return type if we want to stick to the public interface:
```nim
proc newHandle1(): Future[void]
{.async: (raw: true, raises: [CancelledError]).} =
newFuture[void]()
```
It actually does not matter that we specify `Future[void]` as the return type (yet, it has to be `Future`): the actual return type of `newFuture` and of the `newHandle1` proc will be `InternalRaisesFuture[void, (CancelledError,)]` thanks to the `assert: (raw: true, raises: [CancelledError])`.
It would be nice if we can create a more generic version of `newHandle`, so that we do not have to create a new one for each single raising future type. Ideally, we would like this generic to also allow us handling the raised exceptions accordingly.
Using just plain generic does not seem to allow us passing the list of exception types so that it lands nicely in the `raises: [...]` attribute:
```nim
proc newRaisingFuture[T, E](): Future[T] {.async: (raw: true, raises: [E]).} =
newFuture[T]()
```
With this we can pass a single exception type as E. To pass a list of exceptions we can use a template:
```nim
template newRaisingFuture[T](raising: typed): untyped =
block:
proc wrapper(): Future[T] {.async: (raw: true, raises: raising).} =
newFuture[T]()
wrapper()
```
With the `newRaisingFuture` template we can simplify our example to get:
```nim
proc newSomeType(name: string): SomeType =
let t = SomeType(
name: name,
handle1: newRaisingFuture[void]([CancelledError]),
handle2: newRaisingFuture[void]([CancelledError, MyError]),
)
t
```
Perhaps, a more elegant solution would be to use an IIFE (Immediately Invoked Function Expression), e.g.:
```nim
(proc (): Future[void]
{.async: (raw: true, raises: [CancelledError, MyError]).} =
newFuture[void]())()
```
so that we can create a raising future instance like this:
```nim
let ff = (
proc (): Future[void]
{.async: (raw: true, raises: [CancelledError, MyError]).} =
newFuture[void]())()
)()
```
Unfortunately, this will fail with error similar to this one:
```bash
Error: type mismatch: got 'Future[system.void]' for '
newFutureImpl(srcLocImpl("", (filename: "raisingfutures.nim", line: 264,
column: 19).filename, (filename: "raisingfutures.nim", line: 264,
column: 19).line), {})' but expected
'InternalRaisesFuture[system.void, (CancelledError, MyError)]'
```
To see what happened, we can use the `-d:nimDumpAsync` option when compiling, e.g.:
```bash
nim c -r -o:build/ --NimblePath:.nimble/pkgs2 -d:nimDumpAsync raisingfutures.nim
```
This option will print us the expanded `async` macro, where we can find that our call expanded to:
```nim
proc (): InternalRaisesFuture[void, (CancelledError, MyError)]
{.raises: [], gcsafe.} =
newFuture[void]()
```
This obviously misses the definition of the `InternalRaisesFutureRaises` type before calling `newFuture`, which would change the behavior of the `newFuture` call so that instead of returning a regular `Future[void]` it would return `InternalRaisesFuture[system.void, (CancelledError, MyError)]`. The same function, evaluated as regular proc (and not as lambda call) would take the following form:
```nim
proc (): InternalRaisesFuture[seq[int], (CancelledError, MyError)]
{.raises: [], gcsafe.} =
type InternalRaisesFutureRaises {.used.} = (CancelledError, ValueError,)
newFuture[seq[int]]()
```
Looking again into the `chronos/internal/asyncmacro.nim`:
```nim
if raw: # raw async = body is left as-is
if raises != nil and prc.kind notin {nnkProcTy, nnkLambda} and not isEmpty(prc.body):
# Inject `raises` type marker that causes `newFuture` to return a raise-
# tracking future instead of an ordinary future:
#
# type InternalRaisesFutureRaises = `raisesTuple`
# `body`
prc.body = nnkStmtList.newTree(
nnkTypeSection.newTree(
nnkTypeDef.newTree(
nnkPragmaExpr.newTree(
ident"InternalRaisesFutureRaises",
nnkPragma.newTree(ident "used")),
newEmptyNode(),
raises,
)
),
prc.body
)
```
we see the condition:
```nim
if raises != nil and prc.kind notin {nnkProcTy, nnkLambda} and not isEmpty(prc.body):
```
Unfortunately, in our case `prc.kind` is `nnkLambda`, and so the above mentioned type infusion will not happen...
> I do not know why it is chosen to be like this...
Thus, if we would like to use IIFE, we do have to use an internal function from `chronos/internal/asyncfutures.nim`:
```nim
(proc (): Future[void]
{.async: (raw: true, raises: [CancelledError, MyError]).} =
newInternalRaisesFuture[void, (CancelledError, MyError)]())()
```
This call will work, and we can then "hide" the internal primitive in a macro. Below I show the macro, we can use to conveniently create *raising futures* using the IIFE:
```nim
macro newRaisingFuture(T: typedesc, E: typed): untyped =
let
baseType = T.strVal
e =
case E.getTypeInst().typeKind()
of ntyTypeDesc: @[E]
of ntyArray:
for x in E:
if x.getTypeInst().typeKind != ntyTypeDesc:
error("Expected typedesc, got " & repr(x), x)
E.mapIt(it)
else:
error("Expected typedesc, got " & repr(E), E)
let raises = if e.len == 0:
nnkBracket.newTree()
else:
nnkBracket.newTree(e)
let raisesTuple = if e.len == 0:
makeNoRaises()
else:
nnkTupleConstr.newTree(e)
result = nnkStmtList.newTree(
nnkCall.newTree(
nnkPar.newTree(
nnkLambda.newTree(
newEmptyNode(),
newEmptyNode(),
newEmptyNode(),
nnkFormalParams.newTree(
nnkBracketExpr.newTree(
newIdentNode("Future"),
newIdentNode(baseType)
)
),
nnkPragma.newTree(
nnkExprColonExpr.newTree(
newIdentNode("async"),
nnkTupleConstr.newTree(
nnkExprColonExpr.newTree(
newIdentNode("raw"),
newIdentNode("true")
),
nnkExprColonExpr.newTree(
newIdentNode("raises"),
raises
)
)
)
),
newEmptyNode(),
nnkStmtList.newTree(
nnkCall.newTree(
nnkBracketExpr.newTree(
newIdentNode("newInternalRaisesFuture"),
newIdentNode(baseType),
raisesTuple
)
)
)
)
)
)
)
```
Now, creating a raising future is quite elegant:
```nim
proc newSomeType(name: string): SomeType =
let t = SomeType(
name: name,
handle1: newRaisingFuture(void, CancelledError),
handle2: newRaisingFuture(void, [CancelledError, MyError]),
)
t
```
### Using raising futures types
While `Future[...].Raising(...).init()` provides us with quite elegant (although verbose) interface to create raising futures, it seems to display some subtle limitations.
To demonstrate them, let start with the following, quite innocent looking proc:
```nim
proc waitHandle[T](h: Future[T]): Future[T]
{.async: (raises: [CancelledError]).} =
await h
```
Now, let's try to call it passing a raising future as an argument:
```nim
let handle = newRaisingFuture(int, [CancelledError])
handle.complete(42)
echo waitFor waitHandle(handle)
```
> [!info]
> In the examples I am using our macro - just for fun, and it is also shorter to type than `Future[...].Raising(...).init()`
The compilation will fail with the following error:
```bash
Error: cast[type(h)](chronosInternalRetFuture.internalChild).internalError can raise an unlisted exception: ref CatchableError
```
First, realize that we are passing `InternalRaisesFuture[void, (CancelledError,)]` as `Future[void]`. Because we have that:
```nim
type InternalRaisesFuture*[T, E] = ref object of Future[T]
```
it will not cause any troubles. For `Future[T]`, the following version of `await` will be called:
```nim
template await*[T](f: Future[T]): T =
## Ensure that the given `Future` is finished, then return its value.
##
## If the `Future` failed or was cancelled, the corresponding exception will
## be raised instead.
##
## If the `Future` is pending, execution of the current `async` procedure
## will be suspended until the `Future` is finished.
when declared(chronosInternalRetFuture):
chronosInternalRetFuture.internalChild = f
# `futureContinue` calls the iterator generated by the `async`
# transformation - `yield` gives control back to `futureContinue` which is
# responsible for resuming execution once the yielded future is finished
yield chronosInternalRetFuture.internalChild
# `child` released by `futureContinue`
cast[type(f)](chronosInternalRetFuture.internalChild).internalRaiseIfError(f)
when T isnot void:
cast[type(f)](chronosInternalRetFuture.internalChild).value()
else:
unsupported "await is only available within {.async.}"
```
The reason for exception is:
```nim
cast[type(f)](chronosInternalRetFuture.internalChild).internalRaiseIfError(f)
```
which is:
```nim
macro internalRaiseIfError*(fut: FutureBase, info: typed) =
# Check the error field of the given future and raise if it's set to non-nil.
# This is a macro so we can capture the line info from the original call and
# report the correct line number on exception effect violation
let
info = info.lineInfoObj()
res = quote do:
if not(isNil(`fut`.internalError)):
when chronosStackTrace:
injectStacktrace(`fut`.internalError)
raise `fut`.internalError
res.deepLineInfo(info)
res
```
Thus, this will cause the error we see above.
>[!info]
> Notice that it is not *casting* that causes an error.
We could cast to `InternalRaisesFuture` before calling `await`, but although we can often be pretty sure that what we are doing is right, it would be better to avoid downcasting if possible.
Thus, in our `waitHandle` it would be better that we capture the correct type. Fortunately, this is possible, although not obvious.
Unfortunately, the following will not work as the type of the argument `h`:
```nim
Future[T].Raising([CancelledError])
Raising[T](Future[T],[CancelledError])
Raising(Future[T],[CancelledError])
```
Sadly, original `Raising` macro is not flexible enough to handle complications of the generics.
We may like to define a custom type, e.g.:
```nim
type
Handle[T] = Future[T].Raising([CancelledError])
```
Unfortunately, `Raising` macro is again not flexible enough to handle this. Doing:
```nim
type
Handle[T] = Raising(Future[T], [CancelledError])
```
looks more promising, but trying to use `Handle[T]` as type for `h` in our `waitHandle` fails.
What works is `auto`:
```nim
proc waitHandle[T](_: typedesc[Future[T]], h: auto): Future[T] {.async: (raises: [CancelledError]).} =
await h
let handle = newRaisingFuture(int, [CancelledError])
handle.complete(42)
echo waitFor Future[int].waitHandle(handle)
```
Finally, I have experimented a bit with modifying the original `Rasing` macro from chronos, to see if I can make it a bit more permissive. In particular, where `Future[...].Raising(...)` seem to have limitations are generic types, e.g.:
```nim
SomeType2[T] = object
name: string
# will not compile
handle: Future[T].Raising([CancelledError])
```
Here is a version that works:
```nim
from pkg/chronos/internal/raisesfutures import makeNoRaises
macro RaisingFuture(T: typedesc, E: typed): untyped =
let
baseType = T.strVal
e =
case E.getTypeInst().typeKind()
of ntyTypeDesc: @[E]
of ntyArray:
for x in E:
if x.getTypeInst().typeKind != ntyTypeDesc:
error("Expected typedesc, got " & repr(x), x)
E.mapIt(it)
else:
error("Expected typedesc, got " & repr(E), E)
# @[]
let raises = if e.len == 0:
makeNoRaises()
else:
nnkTupleConstr.newTree(e)
result = nnkBracketExpr.newTree(
ident "InternalRaisesFuture",
newIdentNode(baseType),
raises
)
```
We can then do:
```nim
SomeType2[T] = object
name: string
# will not compile
handle: RaisingFuture(T, [CancelledError])
```
We finish this note with some examples of using the `RasingFuture` and `newRaisingFuture` macros:
```nim
type
MyError = object of CatchableError
SomeType = object
name: string
handle1: RaisingFuture(void, CancelledError)
handle2: RaisingFuture(void, [CancelledError, MyError])
handle3: RaisingFuture(int, [CancelledError])
SomeType2[T] = object
name: string
handle: RaisingFuture(T, [CancelledError])
proc newSomeType(name: string): SomeType =
let t = SomeType(
name: name,
handle1: newRaisingFuture(void, CancelledError),
handle2: newRaisingFuture(void, [CancelledError, MyError]),
handle3: newRaisingFuture(int, [CancelledError]),
)
t
proc newSomeType2[T](name: string): SomeType2[T] =
let t = SomeType2[T](
name: name,
handle: newRaisingFuture(T, CancelledError),
)
t
let someTypeInstance = newSomeType("test")
echo typeof(someTypeInstance.handle1)
echo typeof(someTypeInstance.handle2)
echo typeof(someTypeInstance.handle3)
let someType2Instance = newSomeType2[int]("test2")
echo typeof(someType2Instance.handle)
proc waitHandle[T](_: typedesc[Future[T]], h: auto): Future[T] {.async: (raises: [CancelledError]).} =
await h
proc waitHandle2[T](_: typedesc[Future[T]], h: RaisingFuture(T, [CancelledError])): Future[T] {.async: (raises: [CancelledError]).} =
return await h
someTypeInstance.handle1.complete()
waitFor Future[void].waitHandle(someTypeInstance.handle1)
echo "done 1"
someType2Instance.handle.complete(42)
echo waitFor Future[int].waitHandle(someType2Instance.handle)
echo "done 2"
let handle = newRaisingFuture(int, CancelledError)
handle.complete(43)
echo waitFor Future[int].waitHandle2(handle)
echo "done 3"
```
You can also find the source files on GitHub:
- [raisingfutures.nim](https://gist.github.com/marcinczenko/cb48898d24314fbdebe57fd815c3c1be#file-raisingfutures-nim)
- [raisingfutures2.nim](https://gist.github.com/marcinczenko/c667fad0b70718d6f157275a2a7e7a93#file-raisingfutures2-nim)

View File

@ -0,0 +1 @@
We had a session about Codex-BitTorrent integration. The recording can be found [here](https://youtu.be/lChnESwoPoE?si=2AXJR2oOPL2aORzu) and the Excalidraw slides [here](https://link.excalidraw.com/readonly/HqZiN6H2q3eqI0YYPuUh)

View File

@ -0,0 +1,755 @@
In supporting BitTorrent on Codex network, it is important to clarify the pre-conditions: what do we expect to have as an input, and when will be the output.
BitTorrent itself, can have three types of inputs:
- a `.torrent` manifest file - a b-encoded [[BitTorrent metadata files]] - different formats for torrent version one and version 2
- a magnet link - introduced in [[BEP9 - Extension for Peers to Send Metadata Files]] to support trackerless torrent and using DHT for peer discovery
In both cases there are differences between version 1 and version 2 of metadata files (see [[BitTorrent metadata files]] for details) and version 1 and version 2 of [[Magnet Links|magnet links]].
A torrent file, provides a complete description of the torrent, and can be used to compute the corresponding `info` hash.
Thus, while uploading (or seeding) BitTorrent content to the Codex network, the input is the content itself, while the output will be a (hybrid) magnet link.
To retrieve previously seeded content, the input can be a torrent file, a magnet link, or directly an info hash (either v1 or v2, tagged or untagged).
This is illustrated on the following picture:
![[BitTorrent-Upload-Download.svg]]
Thus, from the implementation perspective, the actual input to the Codex network while retrieving previously uploaded content is its `info` hash.
### Uploading BitTorrent Content to Codex
For the time being we only support version 1 and only a single file content (supporting directories and version 2 is work in progress). As a side not, limiting the description to this much simplified version will help to emphasize the important implementation challenges without being distracted with technicalities related to handling multiple file and folders.
Thus, let's assume we have a single input file: `data40k.bin`. It is a binary file of size `40KiB` (`40960` Bytes). We will be using `16KiB` (`16384` Bytes) block size, and commonly used for such small content `piece length` of `256KiB` (`262144` Bytes).
Let's go step by step through the code base to understand the upload process and the related challenges.
First, the upload API:
```
/api/codex/v1/torrent
```
To upload the content we can use the following `POST` request:
```bash
curl -X POST \
http://localhost:8001/api/codex/v1/torrent \
-H 'Content-Type: application/octet-stream' \
-H 'Content-Disposition: filename="data40k.bin"' \
-w '\n' \
-T data40k.bin
```
We use `Content Disposition` header to indicate the name we want to use for the uploaded content.
This will land to the API handler in `codex/rest/api.nim` :
```nim
router.rawApi(MethodPost, "/api/codex/v1/torrent") do() -> RestApiResponse:
## Upload a file in a streaming manner
##
```
It will call `node.storeTorrent` to effectively upload the content and get the resulting `info` (multi) hash back:
```nim
without infoHash =? (
await node.storeTorrent(
AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)),
filename = filename,
mimetype = mimetype,
)
), error:
error "Error uploading file", exc = error.msg
return RestApiResponse.error(Http500, error.msg)
```
This brings us to `node.storeTorrent` in `codex/node.nim:
```nim
proc storeTorrent*(
self: CodexNodeRef,
stream: LPStream,
filename: ?string = string.none,
mimetype: ?string = string.none,
): Future[?!MultiHash] {.async.} =
info "Storing BitTorrent data"
without bitTorrentManifest =?
await self.storePieces(
stream, filename = filename, mimetype = mimetype, blockSize = BitTorrentBlockSize
):
return failure("Unable to store BitTorrent data")
trace "Created BitTorrent manifest", bitTorrentManifest = $bitTorrentManifest
let infoBencoded = bencode(bitTorrentManifest.info)
trace "BitTorrent Info successfully bencoded"
without infoHash =? MultiHash.digest($Sha1HashCodec, infoBencoded).mapFailure, err:
return failure(err)
trace "computed info hash", infoHash = $infoHash
without manifestBlk =? await self.storeBitTorrentManifest(
bitTorrentManifest, infoHash
), err:
error "Unable to store manifest"
return failure(err)
info "Stored BitTorrent data",
infoHash = $infoHash, codexManifestCid = bitTorrentManifest.codexManifestCid
success infoHash
```
It starts with `self.storePieces`, which returns a [[BitTorrent Manifest]]. A manifest contains the BitTorrent Info dictionary and the corresponding Codex Manifest Cid:
```
type
BitTorrentInfo* = ref object
length*: uint64
pieceLength*: uint32
pieces*: seq[MultiHash]
name*: ?string
BitTorrentManifest* = ref object
info*: BitTorrentInfo
codexManifestCid*: Cid
```
`storePieces` does a very similar job to the `store` proc which is used for the regular Codex content, but additionally, it computes the *piece hashes* and creates the `info` dictionary and finally returns the corresponding `BitTorrentManifest`.
Back in `storeTorrent`, we *b-encode* the `info` dictionary and compute its hash (multihash). This `info` (multi) hash is what we will use to announce the content on the Codex DHT (see [[Announcing BitTorrent Content on Codex DHT]]).
Finally, `storeBitTorrentManifest` will effectively store the BitTorrent manifest block on the Codex network:
```
proc storeBitTorrentManifest*(
self: CodexNodeRef, manifest: BitTorrentManifest, infoHash: MultiHash
): Future[?!bt.Block] {.async.} =
let encodedManifest = manifest.encode()
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
trace "Unable to create CID for BitTorrent info hash"
return failure(error)
without blk =? bt.Block.new(data = encodedManifest, cid = infoHashCid, verify = false),
error:
trace "Unable to create block from manifest"
return failure(error)
if err =? (await self.networkStore.putBlock(blk)).errorOption:
trace "Unable to store BitTorrent manifest block", cid = blk.cid, err = err.msg
return failure(err)
success blk
```
Some important things happen here. First, notice, that in Codex we use `Cids` to refer to the content. This is very handy: requesting a cid and getting the corresponding data back, we can immediately check if the content multihash present in the Cid, matches the computed multihash of the received data. If they do not match, we know immediately that the received block is invalid.
But looking at the code above, a careful reader will spot immediately that we are *cheating* a bit.
We first create a cid (`infoHashCid`) using precomputed `infoHash`, which we then associate with the `encodedManifest` in the `Block.new` call. Clearly, `info` hash does not identify our `encodedManifest`: if we compute a hash of the `encodedManifest`, it would not match the precomputed `infoHash`. This is because our Bit Torrent Manifest is more than just the `info` dictionary: it also contains the cid of the corresponding Codex Manifest for our content.
This cid is clearly not a valid cid.
We could create a valid Cid, by, for instance, creating a hash over the whole `encodedManifest` and appending it to the precomputed `infoHash` in such a Cid. Then, while retrieving the corresponding block back, we could first compare that the computed hash over the retrieved data matches the hash of the `encodedManifest` that we included in our cid, and then after reconstructing the BitTorrent Manifest from the encoded data, we could b-encode the `info` dictionary from the reconstructed BitTorrent Manifest, compute its hash, and compare it with the precomputed `infoHash` included in the cid. This would make the cid valid, but there is a problem with this approach.
In Codex, we use cids as references to blocks in `RepoStore`. We namely use cids as inputs to functions like `createBlockExpirationMetadataKey` or `makePrefixKey`. The cid itself is not preserved. The uploader (the seeder) has all necessary data to create an extended cid we describe in the paragraph above, but when requesting, the downloader knows only the `info` hash or potentially the contents of the the `.torrent` metadata file. In any case, the downloader does not know the cid of the underlying Codex manifest, pointing to the actual data. This means that the downloader is unable to create a full cid with the appended hash of the full `encodedManifest`. It is technically possible to send such an incomplete cid and use it to retrieve the full cid from the uploader datastore, but we are not making the system any more secure by doing this. The sender, can easily send a forged block with with perfectly valid cid as it has all necessary information to compute the appended hash, but the receiver, not having access to this information beforehand, will not be able to validate it.
Does it mean we can only be sure that the received content identified by the cid of the Codex manifest matches the requested info hash? No.
Notice, that BitTorrent does not use cids. The BitTorrent protocol operates at the level of pieces, and in version 1 of the protocol does not even use inclusion proofs. Yet, it does not wait till the whole piece is fetched in order to conclude it is genuine.
The info dictionary contains the `pieces` attribute, with hashes for all pieces. Once the piece is aggregated from the underlying blocks of `16KiB`, the hash is computed and compared against an entry in the `pieces` array. And this exactly what we do in Codex in order to prove that the received data, identified by the cid of the Codex manifest, matches the requested `info` hash.
Moreover, we also validate the received data at the block level, even before being able to validate the complete piece. We get this as a bonus from the Codex protocol, which together with data block, sends also the corresponding inclusion proof. Thus, even though at the moment we validate the individual blocks, we do not know if the received data, identified by the cid of the Codex manifest, matches the requested `info` hash, we do know already if the received data matches the Codex manifest. If this is not the case, if does not even make sense to aggregate pieces.
Thus, to summarize, while we cannot validate if the received BitTorrent manifest points to the valid data by validating the corresponding cid (`infoHashCid`), we do it later in two phases. Let's look at the download flow, starting from the end.
### Downloading BitTorrent Content from Codex
We start from the `NetworkPeer.readLoop` (in `codex/blockexchange/network/networkpeer.nim`), where we decode the protocol `Message` with:
```nim
data = await conn.readLp(MaxMessageSize.int)
msg = Message.protobufDecode(data).mapFailure().tryGet()
```
There, for each data item, we call:
```nim
BlockDelivery.decode(initProtoBuffer(item, maxSize = MaxBlockSize))
```
and this is where we get the cid, `Block`, `BlockAddress`, and the corresponding `proof` (for regular data, or *leaf* blocks):
```nim
proc decode*(_: type BlockDelivery, pb: ProtoBuffer): ProtoResult[BlockDelivery] =
var
value = BlockDelivery()
dataBuf = newSeq[byte]()
cidBuf = newSeq[byte]()
cid: Cid
ipb: ProtoBuffer
if ?pb.getField(1, cidBuf):
cid = ?Cid.init(cidBuf).mapErr(x => ProtoError.IncorrectBlob)
if ?pb.getField(2, dataBuf):
value.blk =
?Block.new(cid, dataBuf, verify = true).mapErr(x => ProtoError.IncorrectBlob)
if ?pb.getField(3, ipb):
value.address = ?BlockAddress.decode(ipb)
if value.address.leaf:
var proofBuf = newSeq[byte]()
if ?pb.getField(4, proofBuf):
let proof = ?CodexProof.decode(proofBuf).mapErr(x => ProtoError.IncorrectBlob)
value.proof = proof.some
else:
value.proof = CodexProof.none
else:
value.proof = CodexProof.none
ok(value)
```
We see that we while constructing instance of `Block`, we already request the block validation by setting `verify = true`:
```nim
proc new*(
T: type Block, cid: Cid, data: openArray[byte], verify: bool = true
): ?!Block =
## creates a new block for both storage and network IO
##
without isTorrent =? cid.isTorrentCid, err:
return "Unable to determine if cid is torrent info hash".failure
# info hash cids are "fake cids" - they will not validate
# info hash validation is done outside of the cid itself
if verify and not isTorrent:
let
mhash = ?cid.mhash.mapFailure
computedMhash = ?MultiHash.digest($mhash.mcodec, data).mapFailure
computedCid = ?Cid.init(cid.cidver, cid.mcodec, computedMhash).mapFailure
if computedCid != cid:
return "Cid doesn't match the data".failure
return Block(cid: cid, data: @data).success
```
Here we see that because as explained above, the cids corresponding to the BitTorrent manifest blocks cannot be immediately validated, we make sure, the validation is skipped here for Torrent cids.
Once the `Message` is decoded, back in `NetworkPeer.readLoop`, it is passed to `NetworkPeer.handler` which is set to `Network.rpcHandler` while creating the instance of `NetworkPeer` in `Network.getOrCreatePeer`. For block deliveries, `Network.rpcHandler` forwards `msg.payload` (`seq[BlockDelivery]`) to `Network.handleBlocksDelivery`, which in turn, calls `Network.handlers.onBlocksDelivery`. The `Network.handlers.onBlocksDelivery` is set by the constructor of `BlockExcEngine`. Thus, in the end of its journey, a `seq[BlockDelivery]` from the `msg.payload` ends up in `BlockExcEngine.blocksDeliveryHandler`. This is where the data blocks are further validated against the inclusion proof and then the validated data (*leafs*) blocks or non-data blocks (non-*leafs*, e.g. a BitTorrent or Codex Manifest block), are stored in the `localStore` and then *resolved* against pending blocks via `BlockExcEngine.resolveBlocks` that calls `pendingBlocks.resolve(blocksDelivery)` (`PendingBlocksManager`). This is where `blockReq.handle.complete(bd.blk)` is called on the matching pending blocks, which completes future awaited in `BlockExcEngine.requestBlock`, which completes the future awaited in `NetworkStore.getBlock`: `await self.engine.requestBlock(address)`. And `NetworkStore.getBlock` was awaited either in `CodexNodeRef.fetchPieces` for data blocks or in `CodexNodeRef.fetchTorrentManifest`.
So, how do we get to `CodexNodeRef.fetchPieces` and `CodexNodeRef.fetchTorrentManifest` in the download flow.
It starts with the API handler of `/api/codex/v1/torrent/{infoHash}/network/stream`:
```nim
router.api(MethodGet, "/api/codex/v1/torrent/{infoHash}/network/stream") do(
infoHash: MultiHash, resp: HttpResponseRef
) -> RestApiResponse:
var headers = buildCorsHeaders("GET", allowedOrigin)
without infoHash =? infoHash.mapFailure, error:
return RestApiResponse.error(Http400, error.msg, headers = headers)
if infoHash.mcodec != Sha1HashCodec:
return RestApiResponse.error(
Http400, "Only torrents version 1 are currently supported!", headers = headers
)
if corsOrigin =? allowedOrigin:
resp.setCorsHeaders("GET", corsOrigin)
resp.setHeader("Access-Control-Headers", "X-Requested-With")
trace "torrent requested: ", multihash = $infoHash
await node.retrieveInfoHash(infoHash, resp = resp)
```
`CodexNodeRef.retrieveInfoHash` first tries to fetch the `Torrent` object, which consists of `torrentManifest` and `codexManifest`. To get it, it calls `node.retrieveTorrent(infoHash)` with the `infoHash` as the argument. And then in the `retrieveTorrent` we get to the above mentioned `fetchTorrentManifest`:
```nim
proc retrieveTorrent*(
self: CodexNodeRef, infoHash: MultiHash
): Future[?!Torrent] {.async.} =
without infoHashCid =? Cid.init(CIDv1, InfoHashV1Codec, infoHash).mapFailure, error:
trace "Unable to create CID for BitTorrent info hash"
return failure(error)
without torrentManifest =? (await self.fetchTorrentManifest(infoHashCid)), err:
trace "Unable to fetch Torrent Manifest"
return failure(err)
without codexManifest =? (await self.fetchManifest(torrentManifest.codexManifestCid)),
err:
trace "Unable to fetch Codex Manifest for torrent info hash"
return failure(err)
success (torrentManifest: torrentManifest, codexManifest: codexManifest)
```
We first create `infoHashCid`, using only the precomputed `infoHash` and we pass it to `fetchTorrentManifest`:
```nim
proc fetchTorrentManifest*(
self: CodexNodeRef, infoHashCid: Cid
): Future[?!BitTorrentManifest] {.async.} =
if err =? infoHashCid.isTorrentInfoHash.errorOption:
return failure "CID has invalid content type for torrent info hash {$cid}"
trace "Retrieving torrent manifest for infoHashCid", infoHashCid
without blk =? await self.networkStore.getBlock(BlockAddress.init(infoHashCid)), err:
trace "Error retrieve manifest block", infoHashCid, err = err.msg
return failure err
trace "Successfully retrieved torrent manifest with given block cid",
cid = blk.cid, infoHashCid
trace "Decoding torrent manifest"
without torrentManifest =? BitTorrentManifest.decode(blk), err:
trace "Unable to decode torrent manifest", err = err.msg
return failure("Unable to decode torrent manifest")
trace "Decoded torrent manifest", infoHashCid, torrentManifest = $torrentManifest
without isValid =? torrentManifest.validate(infoHashCid), err:
trace "Error validating torrent manifest", infoHashCid, err = err.msg
return failure(err.msg)
if not isValid:
trace "Torrent manifest does not match torrent info hash", infoHashCid
return failure "Torrent manifest does not match torrent info hash {$infoHashCid}"
return torrentManifest.success
```
Here we will be awaiting for the `networkStore.getBlock`, which will get completed with the block delivery flow we describe at the beginning of this section. We restore the `BitTorrentManifest` object using `BitTorrentManifest.decode(blk)`, and then we validate if the `info` dictionary from the received BitTorrent manifest matches the request `infoHash`:
```nim
without isValid =? torrentManifest.validate(infoHashCid), err:
trace "Error validating torrent manifest", infoHashCid, err = err.msg
return failure(err.msg)
```
Thus, now we know that we have genuine `info` dictionary.
Now, we still need to get and validate the actual data.
BitTorrent manifest includes the cid of the Codex manifest in `codexManifestCid` attribute. Back in `retrieveTorrent`, we thus now fetch the Codex manifest, and we return both to `retrieveInfoHash`, where the download effectively started.
The `retrieveInfoHash` calls `streamTorrent` passing both manifests as arguments:
```nim
let stream = await node.streamTorrent(torrentManifest, codexManifest)
```
Let's take a look at `streamTorrent`:
```nim
proc streamTorrent*(
self: CodexNodeRef, torrentManifest: BitTorrentManifest, codexManifest: Manifest
): Future[LPStream] {.async: (raises: []).} =
trace "Retrieving pieces from torrent"
let stream = LPStream(StoreStream.new(self.networkStore, codexManifest, pad = false))
var jobs: seq[Future[void]]
proc onPieceReceived(blocks: seq[bt.Block], pieceIndex: int): ?!void {.raises: [].} =
trace "Fetched torrent piece - verifying..."
var pieceHashCtx: sha1
pieceHashCtx.init()
for blk in blocks:
pieceHashCtx.update(blk.data)
let pieceHash = pieceHashCtx.finish()
if (pieceHash != torrentManifest.info.pieces[pieceIndex]):
error "Piece verification failed", pieceIndex = pieceIndex
return failure("Piece verification failed")
trace "Piece verified", pieceIndex, pieceHash
# great success
success()
proc prefetch(): Future[void] {.async: (raises: []).} =
try:
if err =? (
await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived)
).errorOption:
error "Unable to fetch blocks", err = err.msg
await stream.close()
except CancelledError:
trace "Prefetch cancelled"
jobs.add(prefetch())
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async: (raises: []).} =
try:
await stream.join()
except CancelledError:
trace "Stream cancelled"
finally:
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))
self.trackedFutures.track(monitorStream())
trace "Creating store stream for torrent manifest"
stream
```
`streamTorrent` does three things:
1. starts background `prefetch` job
2. monitors the stream using `monitorStream`
3. validates the aggregated pieces
The `prefetch` job calls `fetchPieces`:
```nim
proc fetchPieces*(
self: CodexNodeRef,
torrentManifest: BitTorrentManifest,
codexManifest: Manifest,
onPiece: PieceProc,
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
trace "Fetching torrent pieces"
let numOfPieces = torrentManifest.info.pieces.len
let numOfBlocksPerPiece =
torrentManifest.info.pieceLength.int div codexManifest.blockSize.int
let blockIter = Iter[int].new(0 ..< codexManifest.blocksCount)
let pieceIter = Iter[int].new(0 ..< numOfPieces)
self.fetchPieces(
codexManifest.treeCid, blockIter, pieceIter, numOfBlocksPerPiece, onPiece
)
```
At this level, we create the iterators to manage the sequential processing of blocks and pieces with each piece containing `numOfBlocksPerPiece` blocks. Subsequently, we call the overloaded version of `fetchPieces` that will perform the actual (pre) fetching:
```nim
proc fetchPieces*(
self: CodexNodeRef,
cid: Cid,
blockIter: Iter[int],
pieceIter: Iter[int],
numOfBlocksPerPiece: int,
onPiece: PieceProc,
): Future[?!void] {.async: (raises: [CancelledError]).} =
while not blockIter.finished:
let blockFutures = collect:
for i in 0 ..< numOfBlocksPerPiece:
if not blockIter.finished:
let address = BlockAddress.init(cid, blockIter.next())
self.networkStore.getBlock(address)
without blocks =? await allFinishedValues(blockFutures), err:
return failure(err)
if pieceErr =? (onPiece(blocks, pieceIter.next())).errorOption:
return failure(pieceErr)
await sleepAsync(1.millis)
success()
```
We fetch blocks in batches, or rather per pieces. We trigger fetching blocks with `self.networkStore.getBlock(address)`, which will resolve by either getting the block from the local store or from the network through block delivery described above.
Notice, we need to get all the blocks here, not only trigger fetching the blocks that are not yet available in the local store. This is necessary, because we need to get all the blocks in a piece so that we can validate the piece and potentially stop streaming if the piece turns out to be invalid.
Before calling `onPiece`, where validation will take place, we wait for all `Futures` to complete returning the requested blocks.
`onPiece` is set to `onPieceReceived` in `streamTorrent` and it basically computes the SHA1 hash of the concatenated blocks and checks if it matches the (multi) hash from the `info` dictionary. This steps forms the second validation step: after we checked that the `info` dictionary matches the requested `info` hash in the first step described above, here we are making sure that the received content matches the metadata in the `info` dictionary, and thus it is indeed the content identified by the `info` hash from the request.
`fetchPieces` operates in background, and thus very likely after first piece has been fetched the stream will be returned to `retrieveInfoHash` where streaming the blocks down to the client will take place:
```nim
proc retrieveInfoHash(
node: CodexNodeRef, infoHash: MultiHash, resp: HttpResponseRef
): Future[void] {.async.} =
## Download torrent from the node in a streaming
## manner
##
var stream: LPStream
var bytes = 0
try:
without torrent =? (await node.retrieveTorrent(infoHash)), err:
error "Unable to fetch Torrent Metadata", err = err.msg
resp.status = Http404
await resp.sendBody(err.msg)
return
let (torrentManifest, codexManifest) = torrent
if codexManifest.mimetype.isSome:
resp.setHeader("Content-Type", codexManifest.mimetype.get())
else:
resp.addHeader("Content-Type", "application/octet-stream")
if codexManifest.filename.isSome:
resp.setHeader(
"Content-Disposition",
"attachment; filename=\"" & codexManifest.filename.get() & "\"",
)
else:
resp.setHeader("Content-Disposition", "attachment")
await resp.prepareChunked()
let stream = await node.streamTorrent(torrentManifest, codexManifest)
while not stream.atEof:
var
buff = newSeqUninitialized[byte](BitTorrentBlockSize.int)
len = await stream.readOnce(addr buff[0], buff.len)
buff.setLen(len)
if buff.len <= 0:
break
bytes += buff.len
await resp.sendChunk(addr buff[0], buff.len)
await resp.finish()
codex_api_downloads.inc()
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Error streaming blocks", exc = exc.msg
resp.status = Http500
if resp.isPending():
await resp.sendBody(exc.msg)
finally:
info "Sent bytes for torrent", infoHash = $infoHash, bytes
if not stream.isNil:
await stream.close()
```
Now, two important points. First, when the streaming happens to be interrupted the stream will be closed in the `finally` block. This in turns will be detected by the `monitorStream` in `streamTorrent` causing the `prefetch` job to be cancelled. Second, when either piece validation fails, or if any of the `getBlock` future awaiting completion fails, `prefetch` will return error, which will cause the stream to be closed:
```nim
proc prefetch(): Future[void] {.async: (raises: []).} =
try:
if err =? (
await self.fetchPieces(torrentManifest, codexManifest, onPieceReceived)
).errorOption:
error "Unable to fetch blocks", err = err.msg
await stream.close()
except CancelledError:
trace "Prefetch cancelled"
```
Without this detection mechanism, we would either continue fetching blocks even when streaming API request has been interrupted, or we would continue streaming, even when it is already known that the piece validation phase has failed. This would result in invalid content being returned to the client. After any failure in the `prefetch` job, the pieces will no longer be validated, thus it does not make any sense to continue the streaming operation, which otherwise, would cause fetching blocks one-by-one in the streaming loop in `retrieve`:
```nim
while not stream.atEof:
var
buff = newSeqUninitialized[byte](BitTorrentBlockSize.int)
len = await stream.readOnce(addr buff[0], buff.len)
buff.setLen(len)
if buff.len <= 0:
break
bytes += buff.len
await resp.sendChunk(addr buff[0], buff.len)
```
The `stream.readOnce` implemented in `StoreStream`, which uses the same underlying `networkStore` that is also used in `fetchPieces` proc shown above, will be calling that same `getBlock` operation, which in case the block is not already in local store (because it was already there or as a result of the prefetch operation), will request it from the block exchange engine via `BlockExcEngine.requestBlock` operation. In case there is already a pending request for the given block address, the `PendingBlocksManager` will return the existing block handle, so that `BlockExcEngine.requestBlock` operation will not cause duplicate request. It will, however potentially return an invalid block to the client, before the containing piece has been validated in the prefetch phase.
> [!danger]
This may occasionally cause an unlikely event in which both `fetchPieces` in the `prefetch` job and `readOnce` on the `StoreStream` will be awaiting on the `getBlock` of the very last block request, and `readOnce` will regain the control before `fetchPieces` does. In such a case, the client may complete the streaming operation successfully, even if the corresponding last piece validation fails.
>
> OR, in even more unlikely event that all the blocks belonging to the last piece are in the local store, `readOnce` may consume them before the last piece is validated.
>
> We should perhaps make sure that the very last piece can only be streamed to the client after the `prefetch` operation completes.
>
```bash
TRC 2025-03-19 16:11:06.334+01:00 torrent requested: topics="codex restapi" tid=5050038 multihash=sha1/4249FFB943675890CF09342629CD3782D107B709
TRC 2025-03-19 16:11:06.334+01:00 Retrieving torrent manifest for infoHashCid topics="codex node" tid=5050038 infoHashCid=z8q*fCdDsv
TRC 2025-03-19 16:11:06.340+01:00 Successfully retrieved torrent manifest with given block cid topics="codex node" tid=5050038 cid=z8q*fCdDsv infoHashCid=z8q*fCdDsv
TRC 2025-03-19 16:11:06.340+01:00 Decoding torrent manifest topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.340+01:00 Decoded torrent manifest topics="codex node" tid=5050038 infoHashCid=z8q*fCdDsv torrentManifest="BitTorrentManifest(info: BitTorrentInfo(length: 10485760, pieceLength: 262144, pieces: @[sha1/5B3B77A971432C07B1C3C36A73BA5E79DBAE20EE, sha1/322890271DAE134970AABAA9CB30AF7464971599, sha1/2394F90BABA3677D44DDA0565543591AC26CA584, sha1/36EF17CDEFE621AE433A0A30EF3F6A20E0D07DB2, sha1/C21F2C1A1A6BF6E3E6CA75C13969F32EBE802867, sha1/B576B2118EC0B5B0D6AC5BB60BF12F881E5E395C, sha1/3FC4F41C6A83C8411BAE41E83AE327299FA31A98, sha1/92BD07E01D40E8BFB7C3B4597568B44F805508B5, sha1/DEC4E1492D2447436AF54AF3FC6319D409E2F48F, sha1/3E4BFDB4F970A9EDA9906E007073B0FB94A4F91A, sha1/CFB9B01285FA00BC43C5E439A65958D8CBBCD046, sha1/EAFB36F2DD5AA8EE80811946C26DFB8F95ABB18A, sha1/FEDFD7A30E09B395D8F78148CB826127070272DD, sha1/F1588A342C11776DB7DC8C5BCFA72A1701DF0442, sha1/34D4E095D53A0CA33375444A19652BD866027350, sha1/AE7FF2CA95BF751B68D2E22BB015435347CFE49C, sha1/85FCC1B3A8CE7D5C0397E8D27084C067A3067496, sha1/88C3DF9C35A23FE6B1E363F606068F15DB5EE093, sha1/98F7A4A6113A3CC9CB3EF086A2B9E36DAA92B78D, sha1/91DDF5B1F25715C17CD709019B71D2233A142CC1, sha1/1A850C78AB1CB596D8298371132A135DB96D5943, sha1/88E8F31AE70A6A81E25FF9E6D2DC824F07F1AF9A, sha1/A335A0DE3F7E1F4191D89E68F9AE9D4406CE4449, sha1/7AC5488A3B6C8A93F7DF3DE64BBCF2FA1F185F4D, sha1/7A88908AF090C1F8FC2BCF51C8BBB6F92D13AE01, sha1/BBEBBD427BEE80178C291366A570A355204E67CA, sha1/0112EDE76962115D7DDD24F7D14BE41BCCD51634, sha1/EFBEFC1DFBB0F7676447C9D0936775A8933008D2, sha1/6508DE0FD7FC8D8EE300C88F0743A52CD79CB616, sha1/F1B02EFB043CC6F37FE22C59055960AAC4ABDCC6, sha1/74FDA7F48FC089AA7A684D4E7C2C1F4BC5B08980, sha1/FAFAC020F2C1DEC260581810F8BB27BBCB211AFD, sha1/1AB425BA8DD6A2AD1C469417C7F7CB54E3226B82, sha1/08338248DAA53ADCBF65D052E93EB8BC677E14B3, sha1/BAD3B38D731F16B3F6663131EEE06E56EBFAB7C2, sha1/8651B73B110025390F4DDCE6AFD43F34ED65A0BE, sha1/486DCD80D8F25432E31BF59C8818003DFE8709F0, sha1/336038716957C64D3124EA1985D8180EE6A097E8, sha1/C7E2B6ADCAB60B57A9A47CCE8454BDAE46E6EE57, sha1/8B6DA36E26C2654B6407253A7816343F818DEDE5], name: some(\"data10M.bin\")), codexManifestCid: zDvZRwzm1GAigGvTcrXyC2cATBL9W2xNPqjKLKb835pHu4Xcm1E6)"
TRC 2025-03-19 16:11:06.341+01:00 Retrieving manifest for cid topics="codex node" tid=5050038 cid=zDv*Xcm1E6
TRC 2025-03-19 16:11:06.343+01:00 Decoding manifest for cid topics="codex node" tid=5050038 cid=zDv*Xcm1E6
TRC 2025-03-19 16:11:06.343+01:00 Decoded manifest topics="codex node" tid=5050038 cid=zDv*Xcm1E6
TRC 2025-03-19 16:11:06.344+01:00 Retrieving pieces from torrent topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.344+01:00 Fetching torrent pieces topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.344+01:00 Creating store stream for torrent manifest topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.344+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=0
TRC 2025-03-19 16:11:06.371+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.372+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=0
TRC 2025-03-19 16:11:06.372+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=0
TRC 2025-03-19 16:11:06.385+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=1
TRC 2025-03-19 16:11:06.411+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.412+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=1
TRC 2025-03-19 16:11:06.412+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=1
TRC 2025-03-19 16:11:06.419+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=2
TRC 2025-03-19 16:11:06.431+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.431+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=2
TRC 2025-03-19 16:11:06.431+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=2
TRC 2025-03-19 16:11:06.438+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=3
TRC 2025-03-19 16:11:06.471+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.471+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=3
TRC 2025-03-19 16:11:06.471+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=3
TRC 2025-03-19 16:11:06.480+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=4
TRC 2025-03-19 16:11:06.490+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.491+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=4
TRC 2025-03-19 16:11:06.491+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=4
TRC 2025-03-19 16:11:06.496+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=5
TRC 2025-03-19 16:11:06.524+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.524+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=5
TRC 2025-03-19 16:11:06.524+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=5
TRC 2025-03-19 16:11:06.531+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=6
TRC 2025-03-19 16:11:06.549+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.549+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=6
TRC 2025-03-19 16:11:06.549+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=6
TRC 2025-03-19 16:11:06.556+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=7
TRC 2025-03-19 16:11:06.575+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.575+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=7
TRC 2025-03-19 16:11:06.575+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=7
TRC 2025-03-19 16:11:06.583+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=8
TRC 2025-03-19 16:11:06.599+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.600+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=8
TRC 2025-03-19 16:11:06.600+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=8
TRC 2025-03-19 16:11:06.605+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=9
TRC 2025-03-19 16:11:06.624+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.625+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=9
TRC 2025-03-19 16:11:06.625+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=9
TRC 2025-03-19 16:11:06.631+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=10
TRC 2025-03-19 16:11:06.643+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.643+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=10
TRC 2025-03-19 16:11:06.643+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=10
TRC 2025-03-19 16:11:06.648+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=11
TRC 2025-03-19 16:11:06.675+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.675+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=11
TRC 2025-03-19 16:11:06.675+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=11
TRC 2025-03-19 16:11:06.682+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=12
TRC 2025-03-19 16:11:06.693+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.694+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=12
TRC 2025-03-19 16:11:06.694+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=12
TRC 2025-03-19 16:11:06.699+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=13
TRC 2025-03-19 16:11:06.725+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.725+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=13
TRC 2025-03-19 16:11:06.726+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=13
TRC 2025-03-19 16:11:06.732+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=14
TRC 2025-03-19 16:11:06.744+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.744+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=14
TRC 2025-03-19 16:11:06.744+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=14
TRC 2025-03-19 16:11:06.749+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=15
TRC 2025-03-19 16:11:06.775+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.776+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=15
TRC 2025-03-19 16:11:06.776+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=15
TRC 2025-03-19 16:11:06.783+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=16
TRC 2025-03-19 16:11:06.794+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.794+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=16
TRC 2025-03-19 16:11:06.794+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=16
TRC 2025-03-19 16:11:06.800+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=17
TRC 2025-03-19 16:11:06.825+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.826+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=17
TRC 2025-03-19 16:11:06.826+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=17
TRC 2025-03-19 16:11:06.832+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=18
TRC 2025-03-19 16:11:06.844+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.844+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=18
TRC 2025-03-19 16:11:06.844+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=18
TRC 2025-03-19 16:11:06.850+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=19
TRC 2025-03-19 16:11:06.877+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.877+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=19
TRC 2025-03-19 16:11:06.877+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=19
TRC 2025-03-19 16:11:06.884+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=20
TRC 2025-03-19 16:11:06.902+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.902+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=20
TRC 2025-03-19 16:11:06.902+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=20
TRC 2025-03-19 16:11:06.907+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=21
TRC 2025-03-19 16:11:06.926+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.927+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=21
TRC 2025-03-19 16:11:06.927+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=21
TRC 2025-03-19 16:11:06.933+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=22
TRC 2025-03-19 16:11:06.945+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.945+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=22
TRC 2025-03-19 16:11:06.945+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=22
TRC 2025-03-19 16:11:06.951+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=23
TRC 2025-03-19 16:11:06.978+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:06.979+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=23
TRC 2025-03-19 16:11:06.980+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=23
TRC 2025-03-19 16:11:06.994+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=24
TRC 2025-03-19 16:11:07.023+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.023+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=24
TRC 2025-03-19 16:11:07.023+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=24
TRC 2025-03-19 16:11:07.033+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=25
TRC 2025-03-19 16:11:07.049+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.049+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=25
TRC 2025-03-19 16:11:07.049+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=25
TRC 2025-03-19 16:11:07.055+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=26
TRC 2025-03-19 16:11:07.084+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.084+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=26
TRC 2025-03-19 16:11:07.084+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=26
TRC 2025-03-19 16:11:07.091+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=27
TRC 2025-03-19 16:11:07.121+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.121+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=27
TRC 2025-03-19 16:11:07.121+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=27
TRC 2025-03-19 16:11:07.128+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=28
TRC 2025-03-19 16:11:07.140+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.140+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=28
TRC 2025-03-19 16:11:07.141+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=28
TRC 2025-03-19 16:11:07.148+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=29
TRC 2025-03-19 16:11:07.401+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.401+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=29
TRC 2025-03-19 16:11:07.401+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=29
TRC 2025-03-19 16:11:07.409+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=30
TRC 2025-03-19 16:11:07.442+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.442+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=30
TRC 2025-03-19 16:11:07.442+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=30
TRC 2025-03-19 16:11:07.449+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=31
TRC 2025-03-19 16:11:07.481+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.481+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=31
TRC 2025-03-19 16:11:07.481+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=31
TRC 2025-03-19 16:11:07.488+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=32
TRC 2025-03-19 16:11:07.499+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.500+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=32
TRC 2025-03-19 16:11:07.500+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=32
TRC 2025-03-19 16:11:07.505+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=33
TRC 2025-03-19 16:11:07.536+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.536+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=33
TRC 2025-03-19 16:11:07.536+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=33
TRC 2025-03-19 16:11:07.544+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=34
TRC 2025-03-19 16:11:07.555+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.556+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=34
TRC 2025-03-19 16:11:07.556+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=34
TRC 2025-03-19 16:11:07.561+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=35
TRC 2025-03-19 16:11:07.587+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.588+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=35
TRC 2025-03-19 16:11:07.588+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=35
TRC 2025-03-19 16:11:07.594+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=36
TRC 2025-03-19 16:11:07.606+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.606+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=36
TRC 2025-03-19 16:11:07.606+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=36
TRC 2025-03-19 16:11:07.612+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=37
TRC 2025-03-19 16:11:07.637+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.638+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=37
TRC 2025-03-19 16:11:07.638+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=37
TRC 2025-03-19 16:11:07.646+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=38
TRC 2025-03-19 16:11:07.663+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.663+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=38
TRC 2025-03-19 16:11:07.663+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=38
TRC 2025-03-19 16:11:07.669+01:00 Waiting for piece topics="codex restapi" tid=5050038 pieceIndex=39
TRC 2025-03-19 16:11:07.691+01:00 Fetched torrent piece - verifying... topics="codex node" tid=5050038
TRC 2025-03-19 16:11:07.692+01:00 Piece verified topics="codex node" tid=5050038 pieceIndex=39
TRC 2025-03-19 16:11:07.692+01:00 Got piece topics="codex restapi" tid=5050038 pieceIndex=39
INF 2025-03-19 16:11:07.696+01:00 Sent bytes for torrent topics="codex restapi" tid=5050038 infoHash=sha1/4249FFB943675890CF09342629CD3782D107B709 bytes=10485760
```

80
10 Notes/Perkeep.md Normal file
View File

@ -0,0 +1,80 @@
Source: [PerKeep](https://perkeep.org).
See also [[Using PerKeep]]
Some eraly notes as they may be relavent in longer term to shape the support for small content, files, and dictionaries, but also for short term improvement of the Codex protocol and data maodel.
There is a nice [video](https://youtu.be/PlAU_da_U4s?si=GdwoV2CpblfjKwCj) about PerKeep, and here comes some relevant notes.
## Layered Architecture
![image](https://hackmd-prod-images.s3-ap-northeast-1.amazonaws.com/uploads/upload_46f8f356aa6cf7de87c214fff0437ecb.png?AWSAccessKeyId=AKIA3XSAAW6AWSKNINWO&Expires=1744197377&Signature=rQX20PUY3cezYbVbOG0MYxalq%2FA%3D)
In the end it is a blob store. So how they look at it, is also interesting to us.
## Blobs:
- 0 - 16MB,
- no file names,
- no mime-types,
- no metadata,
- no versions,
- just **immutable** blocks
Blobs are represented by *blob-refs*, self-describing identifiers similar to Multihases:
![image](https://hackmd-prod-images.s3-ap-northeast-1.amazonaws.com/uploads/upload_78ced2cfd6de59d72131668165ef8773.png?AWSAccessKeyId=AKIA3XSAAW6AWSKNINWO&Expires=1744197402&Signature=azFlehR79Vu36PGnIy7NVwBR6UQ%3D)
They use SHA-224. There are no deletes.
Blobs are not just flat bytes. Blobs can also be JSON objects with certain known fields, e.g. files are represented by JSON objects:
![image](https://hackmd.io/_uploads/S1kdVh_pJg.png)
Thus, blobstore keeps both data and metadata.
5TB video file or VM image? Merkle tree of "bytes" schema blobs. Data at leaves. Rolling checksum cut points (similar to rsync - to be checked how does it really work). De-duplication within files & shifting files. Efficient seeks/pread.
Files and Dictionaries:
![image](https://hackmd.io/_uploads/Hkwkr3dpkg.png)
### Indexing
The role of the metadata layer is *indexing* - to speed up the search. Most importantyl, it can be fully reconstructed from the BlobStore. As we will on some slides later, on top of indexing we have something called *Corpus* - optimized in memory key-value store to make the search even faster.
Below some slides about indexing:
![image](https://hackmd.io/_uploads/HJQzL2_61x.png)
![image](https://hackmd.io/_uploads/BkGE83OT1l.png)
![image](https://hackmd.io/_uploads/ryjP83uaJl.png)
![image](https://hackmd.io/_uploads/rJ-KU3Oakg.png)
![image](https://hackmd.io/_uploads/HyeiLhd6ke.png)
![image](https://hackmd.io/_uploads/B1NT8h_Tkg.png)
## Handling mutations
This is handled by something called *mermanodes* and needa further investigation.
![image](https://hackmd.io/_uploads/BJ-6PhOTkx.png)
A permanode and is a singed unique object. Having a permanode, you can then add atrributes (or mutations) to it:
![image](https://hackmd.io/_uploads/Bk2v_3ua1x.png)
> There seems to be an error in the above slide - the text should be *Better title* I guess.
![image](https://hackmd.io/_uploads/BkpEYh_pJg.png)
Everytime, you *put* an attribute on a permanode, you create a *mutation* or a *claim* connected to the base permanode. A claim seems to be a blob on its own.
Thus, in short, in PerKeep we seem to be having *mutations by appending*.
To be further investigated.

View File

@ -3,6 +3,10 @@
| related | [[Codex Blocks]], [[Block Storage]], [[Codex Block Exchange Protocol]] |
| ------- | ---------------------------------------------------------------------- |
For a high level overview of how Codex works, you can check the [[Codex-BitTorrent Integration presentation]] slides.
To have a more detailed overview of the block exchange architecture, please refer to [Codex Block Exchange Architecture](https://link.excalidraw.com/readonly/L0Rz0LU3oUBDjpHh9rIp).
We upload the content with API `/api/codex/v1/data`. The handler defined in `codex/rest/api.nim` calls `CodexNodeRef.store` and then returns the `Cid` of the manifest file corresponding to the contents:
```nim

View File

@ -0,0 +1,212 @@
## Uploading
The following setup has been used for testing:
```bash
» tree ./dir
./dir
├── dir1
│ ├── file11.txt # File 11
│ └── file12.txt # File 12
├── file1.txt # File 1
└── file2.txt. # File 2
2 directories, 4 files
```
It is included in `tests/fixtures/tarballs/dir`.
The content has been archived using `tar`:
```bash
» tar -cf tar -cf testtarball.tar dir
» tar -tf testtarbar.tar
dir/
dir/file2.txt
dir/file1.txt
dir/dir1/
dir/dir1/file11.txt
dir/dir1/file12.txt
```
A codex node has been started:
```bash
./build/codex --data-dir=./data-1 --listen-addrs=/ip4/127.0.0.1/tcp/8081 --api-port=8001 --nat=none --disc-port=8091 --log-level=TRACE
```
The tarball has been uploaded to codex using new `tar` API (the output shows reponse):
```bash
» curl -X POST http://localhost:8001/api/codex/v1/tar \
-H 'Content-Type: application/octet-stream' \
-H 'Content-Disposition: filename="testtarbar.tar"' \
-w '\n' \
-T testtarbar.tar | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 6654 100 510 100 6144 53297 627k --:--:-- --:--:-- --:--:-- 722k
[
{
"name": "dir",
"cid": "zDvZRwzm597U7Bq9rDZ29KpqodiGHfAuLqMTXixujUFT3QYNDjgj",
"children": [
{
"name": "file2.txt",
"cid": "zDvZRwzm5oLGkg8kW7g6fRZSfmCqqc44JAUivqJ5TwrwZneCh6V5"
},
{
"name": "file1.txt",
"cid": "zDvZRwzmDJSePX8bskWzKUrQPDutLiFhCuDtenAZDnpMZ52LUCWh"
},
{
"name": "dir1",
"cid": "zDvZRwzkzNF8iJKdSXGJuoiC2pJ1sfrA9brunjhNTts3PrEQ92fs",
"children": [
{
"name": "file11.txt",
"cid": "zDvZRwzm8ZuLQB7kG3fcZqqNRPXbhe2d4du6pDC9MyoRRw12GKfS"
},
{
"name": "file12.txt",
"cid": "zDvZRwzmDd6Mg6E98nrXVkZ7THfK5zyQ6Y3j8fSSavSsAYReteLT"
}
]
}
]
}
]
```
We see that each directory and file gets their separate cids. The cids of the files are regular Codex Manifest cids. The cids for the directories, are cids of *Codex Directory Manifest*. You can retrieve the directory manifest using `dirmanifest` API:
```bash
» curl "http://localhost:8001/api/codex/v1/data/zDvZRwzm597U7Bq9rDZ29KpqodiGHfAuLqMTXixujUFT3QYNDjgj/network/dirmanifest" | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 262 100 262 0 0 154k 0 --:--:-- --:--:-- --:--:-- 255k
{
"cid": "zDvZRwzm597U7Bq9rDZ29KpqodiGHfAuLqMTXixujUFT3QYNDjgj",
"manifest": {
"name": "dir",
"cids": [
"zDvZRwzm5oLGkg8kW7g6fRZSfmCqqc44JAUivqJ5TwrwZneCh6V5",
"zDvZRwzmDJSePX8bskWzKUrQPDutLiFhCuDtenAZDnpMZ52LUCWh",
"zDvZRwzkzNF8iJKdSXGJuoiC2pJ1sfrA9brunjhNTts3PrEQ92fs"
]
}
}
```
The same for the subdirectory `dir1`:
```bash
» curl "http://localhost:8001/api/codex/v1/data/zDvZRwzkzNF8iJKdSXGJuoiC2pJ1sfrA9brunjhNTts3PrEQ92fs/network/dirmanifest" | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 208 100 208 0 0 155k 0 --:--:-- --:--:-- --:--:-- 203k
{
"cid": "zDvZRwzkzNF8iJKdSXGJuoiC2pJ1sfrA9brunjhNTts3PrEQ92fs",
"manifest": {
"name": "dir1",
"cids": [
"zDvZRwzm8ZuLQB7kG3fcZqqNRPXbhe2d4du6pDC9MyoRRw12GKfS",
"zDvZRwzmDd6Mg6E98nrXVkZ7THfK5zyQ6Y3j8fSSavSsAYReteLT"
]
}
}
```
## Downloading
You can download separate files in the regular way, e.g. for `file11.txt`:
```bash
» curl "http://localhost:8001/api/codex/v1/data/zDvZRwzm8ZuLQB7kG3fcZqqNRPXbhe2d4du6pDC9MyoRRw12GKfS/network/stream" -o "file11.txt"
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 8 100 8 0 0 3188 0 --:--:-- --:--:-- --:--:-- 4000
» cat file11.txt
File 11
```
Now, let's download only `dir1`:
```bash
» curl "http://localhost:8001/api/codex/v1/dir/zDvZRwzkzNF8iJKdSXGJuoiC2pJ1sfrA9brunjhNTts3PrEQ92fs/network/stream" -o "dir1.tar"
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 3711 0 3711 0 0 1401k 0 --:--:-- --:--:-- --:--:-- 1812k
```
We can check the contents of `dir1.tar`:
```bash
» tar -tf dir1.tar
dir1
dir1/file11.txt
dir1/file12.txt
```
And we can successfully extract it to see that it has the same content:
```bash
» tar -xf dir1.tar
» tree ./dir1
./dir1
├── file11.txt
└── file12.txt
1 directory, 2 files
» cat dir1/file11.txt
File 11
» cat dir1/file12.txt
File 12
```
Finally, we can fetch the the whole previously uploaded `dir` directory:
```bash
» curl "http://localhost:8001/api/codex/v1/dir/zDvZRwzm597U7Bq9rDZ29KpqodiGHfAuLqMTXixujUFT3QYNDjgj/network/stream" -o "dir.tar"
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 6271 0 6271 0 0 1955k 0 --:--:-- --:--:-- --:--:-- 2041k
```
And also here, we can check its contents:
```bash
» tar -tf dir.tar
dir
dir/file2.txt
dir/file1.txt
dir/dir1
dir/dir1/file11.txt
dir/dir1/file12.txt
» tar -xf dir.tar
» tree ./dir
./dir
├── dir1
│ ├── file11.txt
│ └── file12.txt
├── file1.txt
└── file2.txt
» cat dir/file1.txt
File 1
» cat dir/file2.txt
File 2
» cat dir/dir1/file11.txt
File 11
» cat dir/dir1/file12.txt
File 12
```
## Next steps
This is rough implementation. The most important, missing things are:
- while uploading tarballs, actually do streaming - now we are cheating a bit, so the current implementation will not work well on big file. On the other hand, while downloading directories, we are already doing some sort of a streaming while building the resulting tarballs by using unconstrained async queue.
- make sure we correctly record the permissions and modification timestamps. We have some basic stuff in place, but for the moment we hard-code it.