mirror of https://github.com/waku-org/nwaku.git
514 lines
17 KiB
Nim
514 lines
17 KiB
Nim
{.push raises: [].}
|
|
|
|
from os import DirSep
|
|
|
|
import std/[strutils], chronicles, std/options, stew/byteutils, confutils, results
|
|
import ../waku_core/message
|
|
|
|
const negentropyPath =
|
|
currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep &
|
|
"vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep
|
|
|
|
const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h"
|
|
|
|
logScope:
|
|
topics = "waku sync"
|
|
|
|
type Buffer = object
|
|
len*: uint64
|
|
`ptr`*: ptr uint8
|
|
|
|
type BindingResult = object
|
|
output: Buffer
|
|
have_ids_len: uint
|
|
need_ids_len: uint
|
|
have_ids: ptr Buffer
|
|
need_ids: ptr Buffer
|
|
error: cstring
|
|
|
|
proc toWakuMessageHash(buffer: Buffer): WakuMessageHash =
|
|
assert buffer.len == 32
|
|
|
|
var hash: WakuMessageHash
|
|
|
|
copyMem(hash[0].addr, buffer.ptr, 32)
|
|
|
|
return hash
|
|
|
|
proc toBuffer(x: openArray[byte]): Buffer =
|
|
## converts the input to a Buffer object
|
|
## the Buffer object is used to communicate data with the rln lib
|
|
var temp = @x
|
|
let baseAddr = cast[pointer](x)
|
|
let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len))
|
|
return output
|
|
|
|
proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] =
|
|
var bufLen: uint64
|
|
if isNone(len):
|
|
bufLen = buffer.len
|
|
else:
|
|
bufLen = len.get()
|
|
if bufLen == 0:
|
|
return @[]
|
|
trace "length of buffer is", len = bufLen
|
|
let bytes = newSeq[byte](bufLen)
|
|
copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen)
|
|
return bytes
|
|
|
|
proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] =
|
|
var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr)
|
|
var mySequence = newSeq[Buffer](buffLen)
|
|
for i in 0 .. buffLen - 1:
|
|
mySequence[i] = uncheckedArr[i]
|
|
return mySequence
|
|
|
|
### Storage ###
|
|
|
|
type NegentropyStorage* = distinct pointer
|
|
|
|
proc get_last_error(): cstring {.header: NEGENTROPY_HEADER, importc: "get_last_error".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27
|
|
proc storage_init(
|
|
db_path: cstring, name: cstring
|
|
): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41
|
|
proc raw_insert(
|
|
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
|
|
): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43
|
|
proc raw_erase(
|
|
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
|
|
): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29
|
|
proc free(
|
|
storage: NegentropyStorage
|
|
) {.header: NEGENTROPY_HEADER, importc: "storage_delete".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
|
|
proc size(
|
|
storage: NegentropyStorage
|
|
): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".}
|
|
|
|
### Negentropy ###
|
|
|
|
type RawNegentropy* = distinct pointer
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33
|
|
proc constructNegentropy(
|
|
storage: NegentropyStorage, frameSizeLimit: uint64
|
|
): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37
|
|
proc raw_initiate(
|
|
negentropy: RawNegentropy, r: ptr BindingResult
|
|
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39
|
|
proc raw_setInitiator(
|
|
negentropy: RawNegentropy
|
|
) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45
|
|
proc raw_reconcile(
|
|
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
|
|
): int {.header: NEGENTROPY_HEADER, importc: "reconcile".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51
|
|
proc raw_reconcile_with_ids(
|
|
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
|
|
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35
|
|
proc free(
|
|
negentropy: RawNegentropy
|
|
) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53
|
|
proc free_result(
|
|
r: ptr BindingResult
|
|
) {.header: NEGENTROPY_HEADER, importc: "free_result".}
|
|
|
|
### SubRange ###
|
|
|
|
type NegentropySubRangeStorage* = distinct pointer
|
|
|
|
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57
|
|
proc subrange_init(
|
|
storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64
|
|
): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59
|
|
proc free(
|
|
subrange: NegentropySubRangeStorage
|
|
) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
|
|
proc size(
|
|
subrange: NegentropySubRangeStorage
|
|
): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".}
|
|
|
|
### Negentropy with NegentropySubRangeStorage ###
|
|
|
|
type RawNegentropySubRange = distinct pointer
|
|
|
|
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61
|
|
proc constructNegentropyWithSubRange(
|
|
subrange: NegentropySubRangeStorage, frameSizeLimit: uint64
|
|
): RawNegentropySubRange {.
|
|
header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new"
|
|
.}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65
|
|
proc raw_initiate_subrange(
|
|
negentropy: RawNegentropySubRange, r: ptr BindingResult
|
|
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67
|
|
proc raw_reconcile_subrange(
|
|
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
|
|
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69
|
|
proc raw_reconcile_with_ids_subrange(
|
|
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
|
|
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".}
|
|
|
|
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63
|
|
proc free(
|
|
negentropy: RawNegentropySubRange
|
|
) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".}
|
|
|
|
### Wrappings ###
|
|
|
|
### Storage ###
|
|
|
|
proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.}
|
|
|
|
proc new*(T: type NegentropyStorage): Result[T, string] =
|
|
#TODO db name and path
|
|
let storage = storage_init("", "")
|
|
|
|
#[ TODO: Uncomment once we move to lmdb
|
|
if storage == nil:
|
|
return err("storage initialization failed") ]#
|
|
|
|
if storage == nil:
|
|
return err($get_last_error())
|
|
|
|
return ok(storage)
|
|
|
|
proc delete*(storage: NegentropyStorage) =
|
|
storage.free()
|
|
|
|
proc erase*(
|
|
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
|
|
): Result[void, string] =
|
|
var buffer = toBuffer(hash)
|
|
var bufPtr = addr(buffer)
|
|
let res = raw_erase(storage, uint64(id), bufPtr)
|
|
|
|
#TODO error handling once we move to lmdb
|
|
|
|
if res:
|
|
return ok()
|
|
else:
|
|
return err($get_last_error())
|
|
|
|
proc insert*(
|
|
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
|
|
): Result[void, string] =
|
|
var buffer = toBuffer(hash)
|
|
var bufPtr = addr(buffer)
|
|
let res = raw_insert(storage, uint64(id), bufPtr)
|
|
|
|
#TODO error handling once we move to lmdb
|
|
|
|
if res:
|
|
return ok()
|
|
else:
|
|
return err($get_last_error())
|
|
|
|
proc len*(storage: NegentropyStorage): int =
|
|
int(storage.size)
|
|
|
|
### SubRange ###
|
|
|
|
proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.}
|
|
|
|
proc new*(
|
|
T: type NegentropySubRangeStorage,
|
|
storage: NegentropyStorage,
|
|
startTime: uint64 = uint64.low,
|
|
endTime: uint64 = uint64.high,
|
|
): Result[T, string] =
|
|
let subrange = subrange_init(storage, startTime, endTime)
|
|
|
|
#[ TODO: Uncomment once we move to lmdb
|
|
if storage == nil:
|
|
return err("storage initialization failed") ]#
|
|
|
|
if subrange == nil:
|
|
return err($get_last_error())
|
|
|
|
return ok(subrange)
|
|
|
|
proc delete*(subrange: NegentropySubRangeStorage) =
|
|
subrange.free()
|
|
|
|
proc len*(subrange: NegentropySubRangeStorage): int =
|
|
int(subrange.size)
|
|
|
|
### Interface ###
|
|
|
|
type
|
|
Negentropy* = ref object of RootObj
|
|
|
|
NegentropyWithSubRange = ref object of Negentropy
|
|
inner: RawNegentropySubRange
|
|
|
|
NegentropyWithStorage = ref object of Negentropy
|
|
inner: RawNegentropy
|
|
|
|
NegentropyPayload* = distinct seq[byte]
|
|
|
|
method delete*(self: Negentropy) {.base, gcsafe.} =
|
|
discard
|
|
|
|
method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} =
|
|
discard
|
|
|
|
method serverReconcile*(
|
|
self: Negentropy, query: NegentropyPayload
|
|
): Result[NegentropyPayload, string] {.base.} =
|
|
discard
|
|
|
|
method clientReconcile*(
|
|
self: Negentropy,
|
|
query: NegentropyPayload,
|
|
haves: var seq[WakuMessageHash],
|
|
needs: var seq[WakuMessageHash],
|
|
): Result[Option[NegentropyPayload], string] {.base.} =
|
|
discard
|
|
|
|
### Impl. ###
|
|
|
|
proc new*(
|
|
T: type Negentropy,
|
|
storage: NegentropyStorage | NegentropySubRangeStorage,
|
|
frameSizeLimit: int,
|
|
): Result[T, string] =
|
|
if storage is NegentropyStorage:
|
|
let raw_negentropy =
|
|
constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit))
|
|
|
|
if cast[pointer](raw_negentropy) == nil:
|
|
return err($get_last_error())
|
|
|
|
let negentropy = NegentropyWithStorage(inner: raw_negentropy)
|
|
return ok(negentropy)
|
|
elif storage is NegentropySubRangeStorage:
|
|
let raw_negentropy = constructNegentropyWithSubRange(
|
|
NegentropySubRangeStorage(storage), uint64(frameSizeLimit)
|
|
)
|
|
|
|
if cast[pointer](raw_negentropy) == nil:
|
|
return err($get_last_error())
|
|
|
|
let negentropy = NegentropyWithSubRange(inner: raw_negentropy)
|
|
return ok(negentropy)
|
|
|
|
method delete*(self: NegentropyWithSubRange) =
|
|
self.inner.free()
|
|
|
|
method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] =
|
|
## Client inititate a sync session with a server by sending a payload
|
|
var myResult {.noinit.}: BindingResult = BindingResult()
|
|
var myResultPtr = addr myResult
|
|
|
|
let ret = self.inner.raw_initiate_subrange(myResultPtr)
|
|
if ret < 0 or myResultPtr == nil:
|
|
error "negentropy initiate failed with code ", code = ret
|
|
return err($get_last_error())
|
|
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
|
|
free_result(myResultPtr)
|
|
trace "received return from initiate", len = myResultPtr.output.len
|
|
|
|
return ok(NegentropyPayload(bytes))
|
|
|
|
method serverReconcile*(
|
|
self: NegentropyWithSubRange, query: NegentropyPayload
|
|
): Result[NegentropyPayload, string] =
|
|
## Server response to a negentropy payload.
|
|
## Always return an answer.
|
|
|
|
let queryBuf = toBuffer(seq[byte](query))
|
|
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
|
|
var myResult {.noinit.}: BindingResult = BindingResult()
|
|
var myResultPtr = addr myResult
|
|
|
|
let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr)
|
|
if ret < 0:
|
|
error "raw_reconcile failed with code ", code = ret
|
|
return err($get_last_error())
|
|
trace "received return from raw_reconcile", len = myResultPtr.output.len
|
|
|
|
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
|
|
trace "outputBytes len", len = outputBytes.len
|
|
free_result(myResultPtr)
|
|
|
|
return ok(NegentropyPayload(outputBytes))
|
|
|
|
method clientReconcile*(
|
|
self: NegentropyWithSubRange,
|
|
query: NegentropyPayload,
|
|
haves: var seq[WakuMessageHash],
|
|
needs: var seq[WakuMessageHash],
|
|
): Result[Option[NegentropyPayload], string] =
|
|
## Client response to a negentropy payload.
|
|
## May return an answer, if not the sync session done.
|
|
|
|
let cQuery = toBuffer(seq[byte](query))
|
|
|
|
var myResult {.noinit.}: BindingResult = BindingResult()
|
|
myResult.have_ids_len = 0
|
|
myResult.need_ids_len = 0
|
|
var myResultPtr = addr myResult
|
|
|
|
let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr)
|
|
if ret < 0:
|
|
error "raw_reconcile failed with code ", code = ret
|
|
return err($get_last_error())
|
|
|
|
let output = bufferToBytes(addr myResult.output)
|
|
|
|
var
|
|
have_hashes: seq[Buffer]
|
|
need_hashes: seq[Buffer]
|
|
|
|
if myResult.have_ids_len > 0:
|
|
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
|
|
if myResult.need_ids_len > 0:
|
|
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
|
|
|
|
trace "have and need hashes ",
|
|
have_count = have_hashes.len, need_count = need_hashes.len
|
|
|
|
for i in 0 .. have_hashes.len - 1:
|
|
var hash = toWakuMessageHash(have_hashes[i])
|
|
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
|
|
haves.add(hash)
|
|
|
|
for i in 0 .. need_hashes.len - 1:
|
|
var hash = toWakuMessageHash(need_hashes[i])
|
|
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
|
|
needs.add(hash)
|
|
|
|
trace "return ", output = output, len = output.len
|
|
|
|
free_result(myResultPtr)
|
|
|
|
if output.len < 1:
|
|
return ok(none(NegentropyPayload))
|
|
|
|
return ok(some(NegentropyPayload(output)))
|
|
|
|
method delete*(self: NegentropyWithStorage) =
|
|
self.inner.free()
|
|
|
|
method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] =
|
|
## Client inititate a sync session with a server by sending a payload
|
|
var myResult {.noinit.}: BindingResult = BindingResult()
|
|
var myResultPtr = addr myResult
|
|
|
|
let ret = self.inner.raw_initiate(myResultPtr)
|
|
if ret < 0 or myResultPtr == nil:
|
|
error "negentropy initiate failed with code ", code = ret
|
|
return err($get_last_error())
|
|
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
|
|
free_result(myResultPtr)
|
|
trace "received return from initiate", len = myResultPtr.output.len
|
|
|
|
return ok(NegentropyPayload(bytes))
|
|
|
|
method serverReconcile*(
|
|
self: NegentropyWithStorage, query: NegentropyPayload
|
|
): Result[NegentropyPayload, string] =
|
|
## Server response to a negentropy payload.
|
|
## Always return an answer.
|
|
|
|
let queryBuf = toBuffer(seq[byte](query))
|
|
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
|
|
var myResult {.noinit.}: BindingResult = BindingResult()
|
|
var myResultPtr = addr myResult
|
|
|
|
let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr)
|
|
if ret < 0:
|
|
error "raw_reconcile failed with code ", code = ret
|
|
return err($get_last_error())
|
|
trace "received return from raw_reconcile", len = myResultPtr.output.len
|
|
|
|
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
|
|
trace "outputBytes len", len = outputBytes.len
|
|
free_result(myResultPtr)
|
|
|
|
return ok(NegentropyPayload(outputBytes))
|
|
|
|
method clientReconcile*(
|
|
self: NegentropyWithStorage,
|
|
query: NegentropyPayload,
|
|
haves: var seq[WakuMessageHash],
|
|
needs: var seq[WakuMessageHash],
|
|
): Result[Option[NegentropyPayload], string] =
|
|
## Client response to a negentropy payload.
|
|
## May return an answer, if not the sync session done.
|
|
|
|
let cQuery = toBuffer(seq[byte](query))
|
|
|
|
var myResult {.noinit.}: BindingResult = BindingResult()
|
|
myResult.have_ids_len = 0
|
|
myResult.need_ids_len = 0
|
|
var myResultPtr = addr myResult
|
|
|
|
let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr)
|
|
if ret < 0:
|
|
error "raw_reconcile failed with code ", code = ret
|
|
return err($get_last_error())
|
|
|
|
let output = bufferToBytes(addr myResult.output)
|
|
|
|
var
|
|
have_hashes: seq[Buffer]
|
|
need_hashes: seq[Buffer]
|
|
|
|
if myResult.have_ids_len > 0:
|
|
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
|
|
if myResult.need_ids_len > 0:
|
|
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
|
|
|
|
trace "have and need hashes ",
|
|
have_count = have_hashes.len, need_count = need_hashes.len
|
|
|
|
for i in 0 .. have_hashes.len - 1:
|
|
var hash = toWakuMessageHash(have_hashes[i])
|
|
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
|
|
haves.add(hash)
|
|
|
|
for i in 0 .. need_hashes.len - 1:
|
|
var hash = toWakuMessageHash(need_hashes[i])
|
|
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
|
|
needs.add(hash)
|
|
|
|
trace "return ", output = output, len = output.len
|
|
|
|
free_result(myResultPtr)
|
|
|
|
if output.len < 1:
|
|
return ok(none(NegentropyPayload))
|
|
|
|
return ok(some(NegentropyPayload(output)))
|