mirror of
synced 2025-03-01 07:20:54 +00:00
514 lines
17 KiB
514 lines
17 KiB
{.push raises: [].}
from os import DirSep
import std/[strutils], chronicles, std/options, stew/byteutils, confutils, results
import ../waku_core/message
const negentropyPath =
currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep &
"vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep
const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h"
topics = "waku sync"
type Buffer = object
len*: uint64
`ptr`*: ptr uint8
type BindingResult = object
output: Buffer
have_ids_len: uint
need_ids_len: uint
have_ids: ptr Buffer
need_ids: ptr Buffer
error: cstring
proc toWakuMessageHash(buffer: Buffer): WakuMessageHash =
assert buffer.len == 32
var hash: WakuMessageHash
copyMem(hash[0].addr, buffer.ptr, 32)
return hash
proc toBuffer(x: openArray[byte]): Buffer =
## converts the input to a Buffer object
## the Buffer object is used to communicate data with the rln lib
var temp = @x
let baseAddr = cast[pointer](x)
let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len))
return output
proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] =
var bufLen: uint64
if isNone(len):
bufLen = buffer.len
bufLen = len.get()
if bufLen == 0:
return @[]
trace "length of buffer is", len = bufLen
let bytes = newSeq[byte](bufLen)
copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen)
return bytes
proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] =
var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr)
var mySequence = newSeq[Buffer](buffLen)
for i in 0 .. buffLen - 1:
mySequence[i] = uncheckedArr[i]
return mySequence
### Storage ###
type NegentropyStorage* = distinct pointer
proc get_last_error(): cstring {.header: NEGENTROPY_HEADER, importc: "get_last_error".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27
proc storage_init(
db_path: cstring, name: cstring
): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41
proc raw_insert(
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43
proc raw_erase(
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29
proc free(
storage: NegentropyStorage
) {.header: NEGENTROPY_HEADER, importc: "storage_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size(
storage: NegentropyStorage
): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".}
### Negentropy ###
type RawNegentropy* = distinct pointer
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33
proc constructNegentropy(
storage: NegentropyStorage, frameSizeLimit: uint64
): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37
proc raw_initiate(
negentropy: RawNegentropy, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39
proc raw_setInitiator(
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45
proc raw_reconcile(
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51
proc raw_reconcile_with_ids(
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35
proc free(
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53
proc free_result(
r: ptr BindingResult
) {.header: NEGENTROPY_HEADER, importc: "free_result".}
### SubRange ###
type NegentropySubRangeStorage* = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57
proc subrange_init(
storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64
): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59
proc free(
subrange: NegentropySubRangeStorage
) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size(
subrange: NegentropySubRangeStorage
): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".}
### Negentropy with NegentropySubRangeStorage ###
type RawNegentropySubRange = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61
proc constructNegentropyWithSubRange(
subrange: NegentropySubRangeStorage, frameSizeLimit: uint64
): RawNegentropySubRange {.
header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new"
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65
proc raw_initiate_subrange(
negentropy: RawNegentropySubRange, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67
proc raw_reconcile_subrange(
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69
proc raw_reconcile_with_ids_subrange(
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63
proc free(
negentropy: RawNegentropySubRange
) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".}
### Wrappings ###
### Storage ###
proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.}
proc new*(T: type NegentropyStorage): Result[T, string] =
#TODO db name and path
let storage = storage_init("", "")
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
if storage == nil:
return err($get_last_error())
return ok(storage)
proc delete*(storage: NegentropyStorage) =
proc erase*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_erase(storage, uint64(id), bufPtr)
#TODO error handling once we move to lmdb
if res:
return ok()
return err($get_last_error())
proc insert*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_insert(storage, uint64(id), bufPtr)
#TODO error handling once we move to lmdb
if res:
return ok()
return err($get_last_error())
proc len*(storage: NegentropyStorage): int =
### SubRange ###
proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.}
proc new*(
T: type NegentropySubRangeStorage,
storage: NegentropyStorage,
startTime: uint64 = uint64.low,
endTime: uint64 = uint64.high,
): Result[T, string] =
let subrange = subrange_init(storage, startTime, endTime)
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
if subrange == nil:
return err($get_last_error())
return ok(subrange)
proc delete*(subrange: NegentropySubRangeStorage) =
proc len*(subrange: NegentropySubRangeStorage): int =
### Interface ###
Negentropy* = ref object of RootObj
NegentropyWithSubRange = ref object of Negentropy
inner: RawNegentropySubRange
NegentropyWithStorage = ref object of Negentropy
inner: RawNegentropy
NegentropyPayload* = distinct seq[byte]
method delete*(self: Negentropy) {.base, gcsafe.} =
method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} =
method serverReconcile*(
self: Negentropy, query: NegentropyPayload
): Result[NegentropyPayload, string] {.base.} =
method clientReconcile*(
self: Negentropy,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] {.base.} =
### Impl. ###
proc new*(
T: type Negentropy,
storage: NegentropyStorage | NegentropySubRangeStorage,
frameSizeLimit: int,
): Result[T, string] =
if storage is NegentropyStorage:
let raw_negentropy =
constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit))
if cast[pointer](raw_negentropy) == nil:
return err($get_last_error())
let negentropy = NegentropyWithStorage(inner: raw_negentropy)
return ok(negentropy)
elif storage is NegentropySubRangeStorage:
let raw_negentropy = constructNegentropyWithSubRange(
NegentropySubRangeStorage(storage), uint64(frameSizeLimit)
if cast[pointer](raw_negentropy) == nil:
return err($get_last_error())
let negentropy = NegentropyWithSubRange(inner: raw_negentropy)
return ok(negentropy)
method delete*(self: NegentropyWithSubRange) =
method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_initiate_subrange(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err($get_last_error())
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "received return from initiate", len = myResultPtr.output.len
return ok(NegentropyPayload(bytes))
method serverReconcile*(
self: NegentropyWithSubRange, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
let queryBuf = toBuffer(seq[byte](query))
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
return ok(NegentropyPayload(outputBytes))
method clientReconcile*(
self: NegentropyWithSubRange,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
let cQuery = toBuffer(seq[byte](query))
var myResult {.noinit.}: BindingResult = BindingResult()
myResult.have_ids_len = 0
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
let output = bufferToBytes(addr myResult.output)
have_hashes: seq[Buffer]
need_hashes: seq[Buffer]
if myResult.have_ids_len > 0:
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
if myResult.need_ids_len > 0:
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
trace "have and need hashes ",
have_count = have_hashes.len, need_count = need_hashes.len
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
trace "return ", output = output, len = output.len
if output.len < 1:
return ok(none(NegentropyPayload))
return ok(some(NegentropyPayload(output)))
method delete*(self: NegentropyWithStorage) =
method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_initiate(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err($get_last_error())
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "received return from initiate", len = myResultPtr.output.len
return ok(NegentropyPayload(bytes))
method serverReconcile*(
self: NegentropyWithStorage, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
let queryBuf = toBuffer(seq[byte](query))
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
return ok(NegentropyPayload(outputBytes))
method clientReconcile*(
self: NegentropyWithStorage,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
let cQuery = toBuffer(seq[byte](query))
var myResult {.noinit.}: BindingResult = BindingResult()
myResult.have_ids_len = 0
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
let output = bufferToBytes(addr myResult.output)
have_hashes: seq[Buffer]
need_hashes: seq[Buffer]
if myResult.have_ids_len > 0:
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
if myResult.need_ids_len > 0:
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
trace "have and need hashes ",
have_count = have_hashes.len, need_count = need_hashes.len
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
trace "return ", output = output, len = output.len
if output.len < 1:
return ok(none(NegentropyPayload))
return ok(some(NegentropyPayload(output)))