feat: message transfer mechanism & tests (#2688)

This commit is contained in:
Simon-Pierre Vivier 2024-05-14 09:00:29 -04:00 committed by SionoiS
parent a89c8820f9
commit 0ffa957b34
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951
19 changed files with 1046 additions and 431 deletions

View File

@ -37,10 +37,10 @@ endif
##########
## Main ##
##########
.PHONY: all test update clean negentropy
.PHONY: all test update clean
# default target, because it's the first one that doesn't start with '.'
all: | negentropy wakunode2 example2 chat2 chat2bridge libwaku
all: | wakunode2 example2 chat2 chat2bridge libwaku
test: | testcommon testwaku
@ -51,7 +51,7 @@ update: | update-common
rm -rf waku.nims && \
$(MAKE) waku.nims $(HANDLE_OUTPUT)
clean: | negentropy-clean
clean:
rm -rf build
# must be included after the default target
@ -84,8 +84,6 @@ endif
endif
## end of Heaptracker options
## Pass libnegentropy to linker.
NIM_PARAMS := $(NIM_PARAMS) --passL:./libnegentropy.so
##################
## Dependencies ##
@ -412,9 +410,22 @@ release-notes:
sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g'
# I could not get the tool to replace issue ids with links, so using sed for now,
# asked here: https://github.com/bvieira/sv4git/discussions/101
######################
### NEGENTROPY ###
######################
.PHONY: negentropy
## Pass libnegentropy to linker.
NIM_PARAMS := $(NIM_PARAMS) --passL:./libnegentropy.so
all: | negentropy
clean: | negentropy-clean
negentropy:
$(MAKE) -C vendor/negentropy/cpp && \
cp vendor/negentropy/cpp/libnegentropy.so ./
negentropy-clean:
$(MAKE) -C vendor/negentropy/cpp clean && \
rm libnegentropy.so
rm libnegentropy.so

View File

@ -0,0 +1,194 @@
{.used.}
import std/net, testutils/unittests, chronos, libp2p/crypto/crypto
import
../../../waku/
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, wakunode, testasync]
suite "Store Sync - End to End":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let timeOrigin = now()
let messages =
@[
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)),
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)),
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)),
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)),
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)),
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)),
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)),
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)),
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)),
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)),
]
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0,
syncInterval = 1.hours,
relayJitter = 0.seconds,
enablePruning = false,
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0,
syncInterval = 2.milliseconds,
relayJitter = 0.seconds,
enablePruning = false,
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
# messages are retreived when mounting Waku sync
# but based on interval so this is needed for client only
for msg in messages:
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await allFutures(server.start(), client.start())
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo()
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec)
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec)
asyncTeardown:
# prevent premature channel shutdown
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "no message set differences":
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "client message set differences":
let msg = fakeWakuMessage(@[byte 10])
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "server message set differences":
let msg = fakeWakuMessage(@[byte 10])
server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
suite "Waku Sync - Pruning":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newSqliteArchiveDriver()
let clientArchiveDriver = newSqliteArchiveDriver()
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0,
relayJitter = 0.seconds,
syncInterval = 1.hours,
enablePruning = false,
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0,
syncInterval = 10.milliseconds,
relayJitter = 0.seconds,
enablePruning = true,
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
await allFutures(server.start(), client.start())
asyncTeardown:
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "pruning":
for _ in 0 ..< 4:
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
server.wakuSync.ingessMessage(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == 10
server.wakuSync.storageSize() == 40

View File

@ -5,14 +5,23 @@ import std/options, chronos, chronicles, libp2p/crypto/crypto
import ../../../waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore
proc newTestWakuSync*(
switch: Switch, handler: SyncCallback
switch: Switch,
transfer: Option[TransferCallback] = none(TransferCallback),
prune: Option[PruneCallback] = none(PruneCallback),
interval: Duration = DefaultSyncInterval,
): Future[WakuSync] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuSync.new(
peerManager = peerManager, relayJitter = 0.seconds, syncCB = some(handler)
)
assert proto != nil
let peerManager = PeerManager.new(switch)
let res = await WakuSync.new(
peerManager = peerManager,
relayJitter = 0.seconds,
syncInterval = interval,
pruning = false,
wakuArchive = nil,
wakuStoreClient = nil,
)
let proto = res.get()
proto.start()
switch.mount(proto)

View File

@ -1,4 +1,3 @@
{.used.}
import
./test_protocol
import ./test_protocol, ./test_bindings

BIN
tests/waku_sync/test_bindings Executable file

Binary file not shown.

View File

@ -0,0 +1,141 @@
{.used.}
import std/[options], testutils/unittests, chronos, libp2p/crypto/crypto, std/random
import
../../waku/
[node/peer_manager, waku_core, waku_core/message/digest, waku_sync/raw_bindings],
../testlib/[wakucore],
./sync_utils
random.randomize()
#TODO clean this up
suite "Bindings":
var storage {.threadvar.}: NegentropyStorage
var messages {.threadvar.}: seq[(WakuMessageHash, WakuMessage)]
setup:
let storageRes = NegentropyStorage.new()
assert storageRes.isOk(), $storageRes.error
storage = storageRes.get()
messages = @[]
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
let hash = computeMessageHash(DefaultPubsubTopic, msg)
messages.add((hash, msg))
teardown:
storage.delete()
test "storage insert":
check:
storage.len() == 0
let insRes = storage.insert(messages[0][1].timestamp, messages[0][0])
assert insRes.isOk(), $insRes.error
check:
storage.len() == 1
test "storage erase":
let insRes = storage.insert(messages[0][1].timestamp, messages[0][0])
assert insRes.isOk(), $insRes.error
check:
storage.len() == 1
var delRes = storage.erase(messages[0][1].timestamp, messages[0][0])
assert delRes.isOk()
check:
storage.len() == 0
delRes = storage.erase(messages[0][1].timestamp, messages[0][0])
assert delRes.isErr()
check:
storage.len() == 0
test "subrange":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
check:
storage.len() == 10
let subrangeRes = NegentropySubRangeStorage.new(storage)
assert subrangeRes.isOk(), subrangeRes.error
let subrange = subrangeRes.get()
check:
subrange.len() == 10
#[ test "storage memory size":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
check:
storage.len() == 10
for (hash, msg) in messages:
let delRes = storage.erase(msg.timestamp, hash)
assert delRes.isOk(), $delRes.error
check:
storage.len() == 0
#TODO validate that the occupied memory didn't grow. ]#
test "reconcile server differences":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
let clientNegentropyRes = Negentropy.new(storage, 0)
let storageRes = NegentropyStorage.new()
assert storageRes.isOk(), $storageRes.error
let serverStorage = storageRes.get()
for (hash, msg) in messages:
let insRes = serverStorage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
# the extra msg
let msg = fakeWakuMessage()
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let insRes = serverStorage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
let serverNegentropyRes = Negentropy.new(serverStorage, 0)
assert clientNegentropyRes.isOk(), $clientNegentropyRes.error
assert serverNegentropyRes.isOk(), $serverNegentropyRes.error
let clientNegentropy = clientNegentropyRes.get()
let serverNegentropy = serverNegentropyRes.get()
let initRes = clientNegentropy.initiate()
assert initRes.isOk(), $initRes.error
let init = initRes.get()
let reconRes = serverNegentropy.serverReconcile(init)
assert reconRes.isOk(), $reconRes.error
let srecon = reconRes.get()
var
haves: seq[WakuMessageHash]
needs: seq[WakuMessageHash]
let creconRes = clientNegentropy.clientReconcile(srecon, haves, needs)
assert creconRes.isOk(), $creconRes.error
let reconOpt = creconRes.get()
check:
reconOpt.isNone()
needs[0] == hash

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, times],
std/[options, sets],
testutils/unittests,
chronos,
chronicles,
@ -9,18 +9,15 @@ import
stew/byteutils,
std/random
from std/os import sleep
import
../../waku/[
common/paging,
node/peer_manager,
waku_core,
waku_core/message/digest,
waku_sync,
waku_sync/raw_bindings,
],
../testlib/[common, wakucore, testasync],
../testlib/[wakucore, testasync],
./sync_utils
random.randomize()
@ -29,12 +26,10 @@ suite "Waku Sync":
var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch
var protoHandler {.threadvar.}: SyncCallback
var server {.threadvar.}: WakuSync
var client {.threadvar.}: WakuSync
var serverPeerInfo {.threadvar.}: RemotePeerInfo
var serverPeerInfo {.threadvar.}: Option[RemotePeerInfo]
asyncSetup:
serverSwitch = newTestSwitch()
@ -42,28 +37,23 @@ suite "Waku Sync":
await allFutures(serverSwitch.start(), clientSwitch.start())
protoHandler = proc(
hashes: seq[WakuMessageHash], peer: RemotePeerInfo
) {.async: (raises: []), closure, gcsafe.} =
debug "Received needHashes from peer:", len = hashes.len
for hash in hashes:
debug "Hash received from peer:", hash = hash.to0xHex()
server = await newTestWakuSync(serverSwitch)
client = await newTestWakuSync(clientSwitch)
server = await newTestWakuSync(serverSwitch, handler = protoHandler)
client = await newTestWakuSync(clientSwitch, handler = protoHandler)
serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
serverPeerInfo = some(serverSwitch.peerInfo.toRemotePeerInfo())
asyncTeardown:
await sleepAsync(10.milliseconds)
await allFutures(server.stop(), client.stop())
await allFutures(serverSwitch.stop(), clientSwitch.stop())
suite "Protocol":
asyncTest "sync 2 nodes both empty":
var hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
let hashes = await client.sync(serverPeerInfo)
assert hashes.isOk(), hashes.error
check:
hashes.value.len == 0
hashes.value[0].len == 0
asyncTest "sync 2 nodes empty client full server":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
@ -75,13 +65,13 @@ suite "Waku Sync":
server.ingessMessage(DefaultPubsubTopic, msg3)
var hashes = await client.sync(serverPeerInfo)
await sleepAsync(1) # to ensure graceful shutdown
require (hashes.isOk())
assert hashes.isOk(), hashes.error
check:
hashes.value.len == 3
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value
hashes.value[0].len == 3
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value[0]
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value[0]
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value[0]
asyncTest "sync 2 nodes full client empty server":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
@ -93,9 +83,9 @@ suite "Waku Sync":
client.ingessMessage(DefaultPubsubTopic, msg3)
var hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
assert hashes.isOk(), hashes.error
check:
hashes.value.len == 0
hashes.value[0].len == 0
asyncTest "sync 2 nodes different hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
@ -105,40 +95,30 @@ suite "Waku Sync":
client.ingessMessage(DefaultPubsubTopic, msg1)
server.ingessMessage(DefaultPubsubTopic, msg2)
var hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
var syncRes = await client.sync(serverPeerInfo)
check:
hashes.value.len == 1
hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
syncRes.isOk()
var hashes = syncRes.get()
check:
hashes[0].len == 1
hashes[0][0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
#Assuming message is fetched from peer
client.ingessMessage(DefaultPubsubTopic, msg2)
sleep(1000)
hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
check:
hashes.value.len == 0
#[ asyncTest "sync 2 nodes duplicate hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
syncRes = await client.sync(serverPeerInfo)
server.ingessMessage(DefaultPubsubTopic, msg1)
server.ingessMessage(DefaultPubsubTopic, msg1)
client.ingessMessage(DefaultPubsubTopic, msg1)
server.ingessMessage(DefaultPubsubTopic, msg2)
check:
syncRes.isOk()
hashes = syncRes.get()
var hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
check:
hashes.value.len == 1
#hashes.value[0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
#Assuming message is fetched from peer
client.ingessMessage(DefaultPubsubTopic, msg2)
sleep(1000)
hashes = await client.sync(serverPeerInfo)
require (hashes.isOk())
check:
hashes.value.len == 0 ]#
hashes[0].len == 0
asyncTest "sync 2 nodes same hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
@ -151,13 +131,12 @@ suite "Waku Sync":
let hashes = await client.sync(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value.len == 0
hashes.value[0].len == 0
asyncTest "sync 2 nodes 100K msgs":
var i = 0
let msgCount = 100000
var diffIndex = rand(msgCount)
debug "diffIndex is ", diffIndex = diffIndex
var diffMsg: WakuMessage
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
@ -166,18 +145,20 @@ suite "Waku Sync":
else:
diffMsg = msg
server.ingessMessage(DefaultPubsubTopic, msg)
i = i + 1
i += 1
let hashes = await client.sync(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value.len == 1
hashes.value[0] == computeMessageHash(DefaultPubsubTopic, diffMsg)
hashes.value[0].len == 1
hashes.value[0][0] == computeMessageHash(DefaultPubsubTopic, diffMsg)
asyncTest "sync 2 nodes 100K msgs 10K diffs":
var i = 0
let msgCount = 100000
var diffCount = 10000
var diffMsgHashes: seq[WakuMessageHash]
var randIndexes: seq[int]
while i < diffCount:
@ -185,38 +166,39 @@ suite "Waku Sync":
if randInt in randIndexes:
continue
randIndexes.add(randInt)
i = i + 1
i += 1
i = 0
var tmpDiffCnt = diffCount
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if tmpDiffCnt > 0 and i in randIndexes:
#info "not ingessing in client", i=i
diffMsgHashes.add(computeMessageHash(DefaultPubsubTopic, msg))
tmpDiffCnt = tmpDiffCnt - 1
else:
client.ingessMessage(DefaultPubsubTopic, msg)
server.ingessMessage(DefaultPubsubTopic, msg)
i = i + 1
i += 1
let hashes = await client.sync(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value.len == diffCount
#TODO: Check if all diffHashes are there in needHashes
hashes.value[0].len == diffCount
toHashSet(hashes.value[0]) == toHashSet(diffMsgHashes)
asyncTest "sync 3 nodes 2 client 1 server":
## Setup
let client2Switch = newTestSwitch()
await client2Switch.start()
let client2 = await newTestWakuSync(client2Switch, handler = protoHandler)
let client2 = await newTestWakuSync(client2Switch)
let msgCount = 10000
var i = 0
while i < msgCount:
i = i + 1
i += 1
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if i mod 2 == 0:
client2.ingessMessage(DefaultPubsubTopic, msg)
@ -235,10 +217,8 @@ suite "Waku Sync":
assert hashes2.isOk(), $hashes2.error
check:
hashes1.value.len == int(msgCount / 2)
hashes2.value.len == int(msgCount / 2)
#TODO: Check if all diffHashes are there in needHashes
hashes1.value[0].len == int(msgCount / 2)
hashes2.value[0].len == int(msgCount / 2)
await client2.stop()
await client2Switch.stop()
@ -259,10 +239,10 @@ suite "Waku Sync":
)
let
client2 = await newTestWakuSync(client2Switch, handler = protoHandler)
client3 = await newTestWakuSync(client3Switch, handler = protoHandler)
client4 = await newTestWakuSync(client4Switch, handler = protoHandler)
client5 = await newTestWakuSync(client5Switch, handler = protoHandler)
client2 = await newTestWakuSync(client2Switch)
client3 = await newTestWakuSync(client3Switch)
client4 = await newTestWakuSync(client4Switch)
client5 = await newTestWakuSync(client5Switch)
let msgCount = 100000
var i = 0
@ -280,60 +260,55 @@ suite "Waku Sync":
if i < msgCount - 10000:
client5.ingessMessage(DefaultPubsubTopic, msg)
server.ingessMessage(DefaultPubsubTopic, msg)
i = i + 1
#info "client2 storage size", size = client2.storageSize()
i += 1
var timeBefore = cpuTime()
var timeBefore = getNowInNanosecondTime()
let hashes1 = await client.sync(serverPeerInfo)
var timeAfter = cpuTime()
var timeAfter = getNowInNanosecondTime()
var syncTime = (timeAfter - timeBefore)
info "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime
debug "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime
assert hashes1.isOk(), $hashes1.error
check:
hashes1.value.len == 1
#TODO: Check if all diffHashes are there in needHashes
hashes1.value[0].len == 1
timeBefore = cpuTime()
timeBefore = getNowInNanosecondTime()
let hashes2 = await client2.sync(serverPeerInfo)
timeAfter = cpuTime()
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
info "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime
debug "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime
assert hashes2.isOk(), $hashes2.error
check:
hashes2.value.len == 10
#TODO: Check if all diffHashes are there in needHashes
hashes2.value[0].len == 10
timeBefore = cpuTime()
timeBefore = getNowInNanosecondTime()
let hashes3 = await client3.sync(serverPeerInfo)
timeAfter = cpuTime()
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
info "sync time in seconds", msgsTotal = msgCount, diff = 100, syncTime = syncTime
debug "sync time in seconds",
msgsTotal = msgCount, diff = 100, syncTime = syncTime
assert hashes3.isOk(), $hashes3.error
check:
hashes3.value.len == 100
#TODO: Check if all diffHashes are there in needHashes
hashes3.value[0].len == 100
timeBefore = cpuTime()
timeBefore = getNowInNanosecondTime()
let hashes4 = await client4.sync(serverPeerInfo)
timeAfter = cpuTime()
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
info "sync time in seconds",
debug "sync time in seconds",
msgsTotal = msgCount, diff = 1000, syncTime = syncTime
assert hashes4.isOk(), $hashes4.error
check:
hashes4.value.len == 1000
#TODO: Check if all diffHashes are there in needHashes
hashes4.value[0].len == 1000
timeBefore = cpuTime()
timeBefore = getNowInNanosecondTime()
let hashes5 = await client5.sync(serverPeerInfo)
timeAfter = cpuTime()
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
info "sync time in seconds",
debug "sync time in seconds",
msgsTotal = msgCount, diff = 10000, syncTime = syncTime
assert hashes5.isOk(), $hashes5.error
check:
hashes5.value.len == 10000
#TODO: Check if all diffHashes are there in needHashes
hashes5.value[0].len == 10000
await allFutures(client2.stop(), client3.stop(), client4.stop(), client5.stop())
await allFutures(
@ -351,9 +326,9 @@ suite "Waku Sync":
await allFutures(node1Switch.start(), node2Switch.start(), node3Switch.start())
let node1PeerInfo = node1Switch.peerInfo.toRemotePeerInfo()
let node2PeerInfo = node2Switch.peerInfo.toRemotePeerInfo()
let node3PeerInfo = node3Switch.peerInfo.toRemotePeerInfo()
let node1PeerInfo = some(node1Switch.peerInfo.toRemotePeerInfo())
let node2PeerInfo = some(node2Switch.peerInfo.toRemotePeerInfo())
let node3PeerInfo = some(node3Switch.peerInfo.toRemotePeerInfo())
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash1 = computeMessageHash(DefaultPubsubTopic, msg1)
@ -362,17 +337,10 @@ suite "Waku Sync":
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash3 = computeMessageHash(DefaultPubsubTopic, msg3)
let protoHandler: SyncCallback = proc(
hashes: seq[WakuMessageHash], peer: RemotePeerInfo
) {.async: (raises: []), closure, gcsafe.} =
debug "Received needHashes from peer:", len = hashes.len
for hash in hashes:
debug "Hash received from peer:", hash = hash.to0xHex()
let
node1 = await newTestWakuSync(node1Switch, handler = protoHandler)
node2 = await newTestWakuSync(node2Switch, handler = protoHandler)
node3 = await newTestWakuSync(node3Switch, handler = protoHandler)
node1 = await newTestWakuSync(node1Switch)
node2 = await newTestWakuSync(node2Switch)
node3 = await newTestWakuSync(node3Switch)
node1.ingessMessage(DefaultPubsubTopic, msg1)
node2.ingessMessage(DefaultPubsubTopic, msg1)
@ -394,74 +362,13 @@ suite "Waku Sync":
assert hashes3.isOk(), hashes3.error
check:
hashes1.get().len == 1
hashes2.get().len == 1
hashes3.get().len == 1
hashes1.get()[0].len == 1
hashes2.get()[0].len == 1
hashes3.get()[0].len == 1
hashes1.get()[0] == hash2
hashes2.get()[0] == hash3
hashes3.get()[0] == hash1
hashes1.get()[0][0] == hash2
hashes2.get()[0][0] == hash3
hashes3.get()[0][0] == hash1
await allFutures(node1.stop(), node2.stop(), node3.stop())
await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop())
suite "Bindings":
asyncTest "test c integration":
let s1Res = Storage.new()
let s1 = s1Res.value
assert s1Res.isOk(), $s1Res.error
let s2Res = Storage.new()
let s2 = s2Res.value
assert s2Res.isOk(), $s2Res.error
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msgHash: WakuMessageHash =
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1)
check:
s1.insert(msg1.timestamp, msgHash).isOk()
s2.insert(msg1.timestamp, msgHash).isOk()
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msgHash2: WakuMessageHash =
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
check:
s2.insert(msg2.timestamp, msgHash2).isOk()
let subrange1Res = SubRange.new(s1, 0, uint64.high)
assert subrange1Res.isOk(), $subrange1Res.error
let subrange1 = subrange1Res.value
let subrange2Res = SubRange.new(s2, 0, uint64.high)
assert subrange2Res.isOk(), $subrange2Res.error
let subrange2 = subrange2Res.value
let ng1Res = NegentropySubRange.new(subrange1, 10000)
assert ng1Res.isOk(), $ng1Res.error
let ng1 = ng1Res.value
let ng2Res = NegentropySubRange.new(subrange2, 10000)
assert ng2Res.isOk(), $ng2Res.error
let ng2 = ng2Res.value
let ng1_q1 = ng1.initiate()
check:
ng1_q1.isOk()
let ng2_q1 = ng2.serverReconcile(ng1_q1.get())
check:
ng2_q1.isOk()
var
haveHashes: seq[WakuMessageHash]
needHashes: seq[WakuMessageHash]
let ng1_q2 = ng1.clientReconcile(ng2_q1.get(), haveHashes, needHashes)
check:
needHashes.len() == 1
haveHashes.len() == 0
ng1_q2.isOk()
needHashes[0] == msgHash2
check:
s1.erase(msg1.timestamp, msgHash).isOk()

2
vendor/negentropy vendored

@ -1 +1 @@
Subproject commit 028dbf34706fe490d75ce0f6dfc32ded35c10d4e
Subproject commit 311a21a22bdb6d80e5c4ba5e3d2f550e0062b2cb

View File

@ -355,6 +355,34 @@ type WakuNodeConf* = object
name: "store-max-num-db-connections"
.}: int
## Sync config
storeSync* {.
desc: "Enable store sync protocol: true|false",
defaultValue: false,
name: "store-sync"
.}: bool
storeSyncInterval* {.
desc: "Interval between store sync attempts. In seconds.",
defaultValue: 3600, # 1 hours
name: "store-sync-interval"
.}: int64
storeSyncRelayJitter* {.
hidden,
desc: "Time offset to account for message propagation jitter. In seconds.",
defaultValue: 20,
name: "store-sync-relay-jitter"
.}: int64
storeSyncMaxPayloadSize* {.
hidden,
desc:
"Max size in bytes of the inner negentropy payload. Cannot be less than 5K, 0 is unlimited.",
defaultValue: 0,
name: "store-sync-max-payload-size"
.}: int64
## Filter config
filter* {.
desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter"

View File

@ -275,6 +275,16 @@ proc setupProtocols(
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)
if conf.storeSync:
(
await node.mountWakuSync(
int(conf.storeSyncMaxPayloadSize),
conf.storeSyncInterval.seconds(),
conf.storeSyncRelayJitter.seconds(),
)
).isOkOr:
return err("failed to mount waku sync protocol: " & $error)
# NOTE Must be mounted after relay
if conf.lightpush:
try:

View File

@ -196,59 +196,37 @@ proc connectToNodes*(
## Waku Sync
proc mountWakuSync*(node: WakuNode): Result[void, string] =
proc mountWakuSync*(
node: WakuNode,
maxFrameSize: int = DefaultMaxFrameSize,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
enablePruning: bool = true, # For testing purposes
): Future[Result[void, string]] {.async.} =
if not node.wakuSync.isNil():
return err("Waku sync already mounted, skipping")
return err("already mounted")
let prune: PruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure, gcsafe.} =
let archiveCursor =
if cursor.isSome():
some(ArchiveCursor(hash: cursor.get()))
else:
none(ArchiveCursor)
let query = ArchiveQuery(
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
node.wakuSync = (
await WakuSync.new(
peerManager = node.peerManager,
maxFrameSize = maxFrameSize,
syncInterval = syncInterval,
relayJitter = relayJitter,
pruning = enablePruning,
wakuArchive = node.wakuArchive,
wakuStoreClient = node.wakuStoreClient,
)
).valueOr:
return err("initialization failed: " & error)
let catchable = catch:
await node.wakuArchive.findMessages(query)
let res =
if catchable.isErr():
return err(catchable.error.msg)
else:
catchable.get()
let response = res.valueOr:
return err($error)
let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)
let cursor =
if response.cursor.isNone():
none(WakuMessageHash)
else:
some(response.cursor.get().hash)
return ok((elements, cursor))
#TODO add sync callback and options
node.wakuSync = WakuSync.new(peerManager = node.peerManager, pruneCB = some(prune))
let catchRes = catch:
let catchable = catch:
node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec))
if catchRes.isErr():
return err(catchRes.error.msg)
if catchable.isErr():
return err("switch mounting failed: " & catchable.error.msg)
if node.started:
node.wakuSync.start()
return ok()
@ -1324,6 +1302,9 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuMetadata.isNil():
node.wakuMetadata.start()
if not node.wakuSync.isNil():
node.wakuSync.start()
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(

View File

@ -99,7 +99,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
tuplesToWakuPeers(peers, lightpushPeers)
if not node.wakuSync.isNil():
# Map WakuStore peers to WakuPeers and add to return list
# Map WakuSync peers to WakuPeers and add to return list
let syncPeers = node.peerManager.peerStore.peers(WakuSyncCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),

View File

@ -37,7 +37,8 @@ template nanosecondTime*(collector: Gauge, body: untyped) =
else:
body
proc timestampInSeconds*(time: Timestamp): Timestamp =
# Unused yet
#[ proc timestampInSeconds*(time: Timestamp): Timestamp =
let timeStr = $time
var timestamp: Timestamp = time
@ -47,4 +48,4 @@ proc timestampInSeconds*(time: Timestamp): Timestamp =
timestamp = Timestamp(time div Timestamp(1_000_000))
elif timeStr.len() > 10:
timestamp = Timestamp(time div Timestamp(1000))
return timestamp
return timestamp ]#

View File

@ -51,7 +51,7 @@ proc sendStoreRequest(
return ok(res)
proc query*(
self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo
self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId
): Future[StoreQueryResult] {.async, gcsafe.} =
if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))

View File

@ -3,25 +3,25 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/[options], chronos
import std/[options], chronos, libp2p/peerId
import ../waku_core
const
DefaultSyncInterval*: Duration = 1.hours
DefaultPruneInterval*: Duration = 30.minutes
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
DefaultFrameSize* = 153600
DefaultMaxFrameSize* = 1048576 # 1 MiB
DefaultGossipSubJitter*: Duration = 20.seconds
type
SyncCallback* =
proc(hashes: seq[WakuMessageHash], syncPeer: RemotePeerInfo) {.async: (raises: []).}
TransferCallback* = proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.}
PruneCallback* = proc(
startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash)
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []).}
] {.async: (raises: []), closure.}
SyncPayload* = object
rangeStart*: Option[uint64]

View File

@ -4,8 +4,8 @@ else:
{.push raises: [].}
import
std/[options],
stew/results,
std/[options, sugar, sequtils],
stew/[results, byteutils],
chronicles,
chronos,
metrics,
@ -17,42 +17,47 @@ import
../common/nimchronos,
../common/enr,
../waku_core,
../waku_archive,
../waku_store/[client, common],
../waku_enr,
../node/peer_manager/peer_manager,
./raw_bindings,
./common,
./session,
./storage_manager
./session
logScope:
topics = "waku sync"
type WakuSync* = ref object of LPProtocol
storage: Storage # Negentropy protocol storage
storage: NegentropyStorage
peerManager: PeerManager
maxFrameSize: int
syncInterval: timer.Duration # Time between each syncronisation attempt
relayJitter: Duration # Time delay until all messages are mostly received network wide
syncCallBack: Option[SyncCallBack] # Callback with the result of the syncronisation
transferCallBack: Option[TransferCallback] # Callback for message transfer.
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
pruneStart: Timestamp # Last pruning start timestamp
pruneInterval: Duration # Time between each pruning attempt
pruneStart: Timestamp # Last pruning start timestamp
pruneOffset: timer.Duration # Offset to prune a bit more than necessary.
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
proc storageSize*(self: WakuSync): int =
self.storage.len
proc ingessMessage*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
return
#TODO: Do we need to check if this message has already been ingessed?
# because what if messages is received via gossip and sync as well?
# Might 2 entries to be inserted into storage which is inefficient.
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
trace "inserting message into storage ",
hash = msgHash.toHex(), timestamp = msg.timestamp
trace "inserting message into waku sync storage ",
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
if self.storage.insert(msg.timestamp, msgHash).isErr():
debug "failed to insert message ", hash = msgHash.toHex()
error "failed to insert message ", msg_hash = msgHash.to0xHex()
proc calculateRange(jitter: Duration, syncRange: Duration = 1.hours): (int64, int64) =
var now = getNowInNanosecondTime()
@ -111,40 +116,30 @@ proc request(
return ok(hashes)
proc sync*(
self: WakuSync
self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async, gcsafe.} =
let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")
let peer =
if peerInfo.isSome():
peerInfo.get()
else:
let peer: RemotePeerInfo = self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")
peer
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
error = $error
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
return err("Sync request error: " & error)
return ok((hashes, peer))
proc sync*(
self: WakuSync, peer: RemotePeerInfo
): Future[Result[seq[WakuMessageHash], string]] {.async, gcsafe.} =
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
error = $error
return err("Sync request error: " & error)
return ok(hashes)
proc handleLoop(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
@ -184,14 +179,16 @@ proc initProtocolHandler(self: WakuSync) =
let hashes = (await self.handleLoop(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
error = $error
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
#TODO send error code and desc to client
return
#TODO handle the hashes that the server need from the client
if hashes.len > 0 and self.transferCallBack.isSome():
let callback = self.transferCallBack.get()
(await callback(hashes, conn.peerId)).isOkOr:
error "transfer callback failed", error = $error
debug "sync session ended gracefully",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
@ -199,20 +196,141 @@ proc initProtocolHandler(self: WakuSync) =
self.handler = handle
self.codec = WakuSyncCodec
proc initPruningHandler(self: WakuSync, wakuArchive: WakuArchive) =
if wakuArchive.isNil():
return
self.pruneCallBack = some(
proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
let archiveCursor =
if cursor.isSome():
some(ArchiveCursor(hash: cursor.get()))
else:
none(ArchiveCursor)
let query = ArchiveQuery(
includeData: true,
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
)
let catchable = catch:
await wakuArchive.findMessages(query)
if catchable.isErr():
return err("archive error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("archive error: " & $error)
let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)
let cursor =
if response.cursor.isNone():
none(WakuMessageHash)
else:
some(response.cursor.get().hash)
return ok((elements, cursor))
)
proc initTransferHandler(
self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
) =
if wakuArchive.isNil() or wakuStoreClient.isNil():
return
self.transferCallBack = some(
proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.} =
var query = StoreQueryRequest()
query.includeData = true
query.messageHashes = hashes
query.paginationLimit = some(uint64(100))
while true:
let catchable = catch:
await wakuStoreClient.query(query, peerId)
if catchable.isErr():
return err("store client error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("store client error: " & $error)
query.paginationCursor = response.paginationCursor
for kv in response.messages:
let handleRes = catch:
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
if handleRes.isErr():
error "message transfer failed", error = handleRes.error.msg
# Messages can be synced next time since they are not added to storage yet.
continue
self.ingessMessage(kv.pubsubTopic.get(), kv.message.get())
if query.paginationCursor.isNone():
break
return ok()
)
proc initFillStorage(
self: WakuSync, wakuArchive: WakuArchive
): Future[Result[void, string]] {.async.} =
if wakuArchive.isNil():
return ok()
let endTime = getNowInNanosecondTime()
let starTime = endTime - self.syncInterval.nanos
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
)
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err($error)
for (topic, msg) in response.topics.zip(response.messages):
self.ingessMessage(topic, msg)
if response.cursor.isNone():
break
query.cursor = response.cursor
return ok()
proc new*(
T: type WakuSync,
peerManager: PeerManager,
maxFrameSize: int = DefaultFrameSize,
maxFrameSize: int = DefaultMaxFrameSize,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
#Default gossipsub jitter in network.
syncCB: Option[SyncCallback] = none(SyncCallback),
pruneInterval: Duration = DefaultPruneInterval,
pruneCB: Option[PruneCallBack] = none(PruneCallback),
): T =
let storage = Storage.new().valueOr:
debug "storage creation failed"
return
pruning: bool,
wakuArchive: WakuArchive,
wakuStoreClient: WakuStoreClient,
): Future[Result[T, string]] {.async.} =
let storage = NegentropyStorage.new().valueOr:
return err("negentropy storage creation failed")
let sync = WakuSync(
storage: storage,
@ -220,73 +338,132 @@ proc new*(
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
relayJitter: relayJitter,
syncCallBack: syncCB,
pruneCallBack: pruneCB,
pruneStart: getNowInNanosecondTime(),
pruneInterval: pruneInterval,
pruneOffset: syncInterval div 2,
)
sync.initProtocolHandler()
if pruning:
sync.initPruningHandler(wakuArchive)
sync.initTransferHandler(wakuArchive, wakuStoreClient)
let res = await sync.initFillStorage(wakuArchive)
if res.isErr():
return err("initial storage filling error: " & res.error)
info "WakuSync protocol initialized"
return sync
return ok(sync)
proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
proc periodicSync(self: WakuSync) {.async.} =
while true:
await sleepAsync(self.syncInterval)
let (hashes, peer) = (await self.sync()).valueOr:
debug "periodic sync failed", error = error
continue
debug "periodic sync started"
let callback = self.syncCallBack.valueOr:
continue
var
hashes: seq[WakuMessageHash]
peer: RemotePeerInfo
tries = 3
await callback(hashes, peer)
while true:
let res = (await self.sync()).valueOr:
error "sync failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
proc periodicPrune(self: WakuSync) {.async.} =
let callback = self.pruneCallBack.valueOr:
return
hashes = res[0]
peer = res[1]
break
if hashes.len > 0:
tries = 3
while true:
(await callback(hashes, peer.peerId)).isOkOr:
error "transfer callback failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
break
debug "periodic sync done", hashSynced = hashes.len
proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
await sleepAsync(self.syncInterval)
var pruneStop = getNowInNanosecondTime()
while true:
await sleepAsync(self.pruneInterval)
await sleepAsync(self.syncInterval)
let pruneStop = getNowInNanosecondTime()
debug "periodic prune started",
startTime = self.pruneStart - self.pruneOffset.nanos,
endTime = pruneStop,
storageSize = self.storage.len
var (elements, cursor) =
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))
var tries = 3
while true:
(elements, cursor) = (await callback(self.pruneStart, pruneStop, cursor)).valueOr:
debug "storage pruning failed", error = $error
break
(elements, cursor) = (
await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor)
).valueOr:
error "pruning callback failed", error = $error
if tries > 0:
tries -= 1
await sleepAsync(30.seconds)
continue
else:
break
if elements.len == 0:
break
for (hash, timestamp) in elements:
self.storage.erase(timestamp, hash).isOkOr:
trace "element pruning failed", time = timestamp, hash = hash, error = error
error "storage erase failed",
timestamp = timestamp, msg_hash = hash, error = $error
continue
if cursor.isNone():
break
self.pruneStart = pruneStop
pruneStop = getNowInNanosecondTime()
debug "periodic prune done", storageSize = self.storage.len
proc start*(self: WakuSync) =
self.started = true
self.pruneStart = getNowInNanosecondTime()
if self.syncCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync()
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync(self.transferCallBack.get())
if self.pruneCallBack.isSome() and self.pruneInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune()
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get())
info "WakuSync protocol started"
proc stopWait*(self: WakuSync) {.async.} =
await self.periodicSyncFut.cancelAndWait()
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
info "WakuSync protocol stopped"

View File

@ -19,11 +19,11 @@ const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h"
logScope:
topics = "waku sync"
type Buffer* = object
type Buffer = object
len*: uint64
`ptr`*: ptr uint8
type BindingResult* = object
type BindingResult = object
output: Buffer
have_ids_len: uint
need_ids_len: uint
@ -40,7 +40,7 @@ proc toWakuMessageHash(buffer: Buffer): WakuMessageHash =
return hash
proc toBuffer*(x: openArray[byte]): Buffer =
proc toBuffer(x: openArray[byte]): Buffer =
## converts the input to a Buffer object
## the Buffer object is used to communicate data with the rln lib
var temp = @x
@ -48,7 +48,7 @@ proc toBuffer*(x: openArray[byte]): Buffer =
let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len))
return output
proc BufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] =
proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] =
var bufLen: uint64
if isNone(len):
bufLen = buffer.len
@ -70,63 +70,65 @@ proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] =
### Storage ###
type Storage* = distinct pointer
type NegentropyStorage* = distinct pointer
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27
proc storage_init(
db_path: cstring, name: cstring
): Storage {.header: NEGENTROPY_HEADER, importc: "storage_new".}
): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41
proc raw_insert(
storage: Storage, timestamp: uint64, id: ptr Buffer
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43
proc raw_erase(
storage: Storage, timestamp: uint64, id: ptr Buffer
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29
proc free(storage: Storage) {.header: NEGENTROPY_HEADER, importc: "storage_delete".}
proc free(
storage: NegentropyStorage
) {.header: NEGENTROPY_HEADER, importc: "storage_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size*(
storage: Storage
proc size(
storage: NegentropyStorage
): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".}
### Negentropy ###
type Negentropy* = distinct pointer
type RawNegentropy* = distinct pointer
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33
proc constructNegentropy(
storage: Storage, frameSizeLimit: uint64
): Negentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".}
storage: NegentropyStorage, frameSizeLimit: uint64
): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37
proc raw_initiate(
negentropy: Negentropy, r: ptr BindingResult
negentropy: RawNegentropy, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39
proc raw_setInitiator(
negentropy: Negentropy
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45
proc raw_reconcile(
negentropy: Negentropy, query: ptr Buffer, r: ptr BindingResult
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51
proc raw_reconcile_with_ids(
negentropy: Negentropy, query: ptr Buffer, r: ptr BindingResult
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35
proc free(
negentropy: Negentropy
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53
@ -136,57 +138,61 @@ proc free_result(
### SubRange ###
type SubRange* = distinct pointer
type NegentropySubRangeStorage* = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57
proc subrange_init(
storage: Storage, startTimestamp: uint64, endTimestamp: uint64
): SubRange {.header: NEGENTROPY_HEADER, importc: "subrange_new".}
storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64
): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59
proc free(subrange: SubRange) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".}
proc free(
subrange: NegentropySubRangeStorage
) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size*(
subrange: SubRange
proc size(
subrange: NegentropySubRangeStorage
): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".}
### Negentropy with SubRange ###
### Negentropy with NegentropySubRangeStorage ###
type NegentropySubRange* = distinct pointer
type RawNegentropySubRange = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61
proc constructNegentropyWithSubRange(
subrange: SubRange, frameSizeLimit: uint64
): NegentropySubRange {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new".}
subrange: NegentropySubRangeStorage, frameSizeLimit: uint64
): RawNegentropySubRange {.
header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new"
.}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65
proc raw_initiate_subrange(
negentropy: NegentropySubRange, r: ptr BindingResult
negentropy: RawNegentropySubRange, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67
proc raw_reconcile_subrange(
negentropy: NegentropySubRange, query: ptr Buffer, r: ptr BindingResult
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69
proc raw_reconcile_with_ids_subrange(
negentropy: NegentropySubRange, query: ptr Buffer, r: ptr BindingResult
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63
proc free(
negentropy: NegentropySubRange
negentropy: RawNegentropySubRange
) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".}
### Wrappings ###
type NegentropyPayload* = distinct seq[byte]
### Storage ###
proc `==`*(a: Storage, b: pointer): bool {.borrow.}
proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.}
proc new*(T: type Storage): Result[T, string] =
proc new*(T: type NegentropyStorage): Result[T, string] =
#TODO db name and path
let storage = storage_init("", "")
@ -195,13 +201,15 @@ proc new*(T: type Storage): Result[T, string] =
return err("storage initialization failed") ]#
return ok(storage)
proc delete*(storage: Storage) =
free(storage)
proc delete*(storage: NegentropyStorage) =
storage.free()
proc erase*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, string] =
let cString = toBuffer(hash)
let res = raw_erase(storage, uint64(id), cString.unsafeAddr)
proc erase*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_erase(storage, uint64(id), bufPtr)
#TODO error handling once we move to lmdb
@ -210,7 +218,9 @@ proc erase*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, st
else:
return err("erase error")
proc insert*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, string] =
proc insert*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_insert(storage, uint64(id), bufPtr)
@ -222,39 +232,107 @@ proc insert*(storage: Storage, id: int64, hash: WakuMessageHash): Result[void, s
else:
return err("insert error")
proc `==`*(a: NegentropySubRange, b: pointer): bool {.borrow.}
proc len*(storage: NegentropyStorage): int =
int(storage.size)
### SubRange ###
proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.}
proc new*(
T: type NegentropySubrange, subrange: SubRange, frameSizeLimit: int
T: type NegentropySubRangeStorage,
storage: NegentropyStorage,
startTime: uint64 = uint64.low,
endTime: uint64 = uint64.high,
): Result[T, string] =
let negentropy = constructNegentropyWithSubRange(subrange, uint64(frameSizeLimit))
if negentropy == nil:
return err("negentropy initialization failed due to lower framesize")
return ok(negentropy)
let subrange = subrange_init(storage, startTime, endTime)
proc delete*(negentropy: NegentropySubRange) =
free(negentropy)
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
return ok(subrange)
proc initiate*(negentropy: NegentropySubrange): Result[NegentropyPayload, string] =
proc delete*(subrange: NegentropySubRangeStorage) =
subrange.free()
proc len*(subrange: NegentropySubRangeStorage): int =
int(subrange.size)
### Interface ###
type
Negentropy* = ref object of RootObj
NegentropyWithSubRange = ref object of Negentropy
inner: RawNegentropySubRange
NegentropyWithStorage = ref object of Negentropy
inner: RawNegentropy
NegentropyPayload* = distinct seq[byte]
method delete*(self: Negentropy) {.base.} =
discard
method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} =
discard
method serverReconcile*(
self: Negentropy, query: NegentropyPayload
): Result[NegentropyPayload, string] {.base.} =
discard
method clientReconcile*(
self: Negentropy,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] {.base.} =
discard
### Impl. ###
proc new*(
T: type Negentropy,
storage: NegentropyStorage | NegentropySubRangeStorage,
frameSizeLimit: int,
): Result[T, string] =
if storage is NegentropyStorage:
let raw_negentropy =
constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit))
let negentropy = NegentropyWithStorage(inner: raw_negentropy)
return ok(negentropy)
elif storage is NegentropySubRangeStorage:
let raw_negentropy = constructNegentropyWithSubRange(
NegentropySubRangeStorage(storage), uint64(frameSizeLimit)
)
let negentropy = NegentropyWithSubRange(inner: raw_negentropy)
return ok(negentropy)
method delete*(self: NegentropyWithSubRange) =
self.inner.free()
method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = raw_initiate_subrange(negentropy, myResultPtr)
let ret = self.inner.raw_initiate_subrange(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err("negentropy already initiated!")
let bytes: seq[byte] = BufferToBytes(addr(myResultPtr.output))
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
free_result(myResultPtr)
trace "received return from initiate", len = myResultPtr.output.len
return ok(NegentropyPayload(bytes))
proc setInitiator*(negentropy: Negentropy) =
raw_setInitiator(negentropy)
proc serverReconcile*(
negentropy: NegentropySubrange, query: NegentropyPayload
method serverReconcile*(
self: NegentropyWithSubRange, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
@ -264,23 +342,23 @@ proc serverReconcile*(
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = raw_reconcile_subrange(negentropy, queryBufPtr, myResultPtr)
let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = BufferToBytes(addr(myResultPtr.output))
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
free_result(myResultPtr)
return ok(NegentropyPayload(outputBytes))
proc clientReconcile*(
negentropy: NegentropySubrange,
method clientReconcile*(
self: NegentropyWithSubRange,
query: NegentropyPayload,
haveIds: var seq[WakuMessageHash],
needIds: var seq[WakuMessageHash],
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
@ -292,12 +370,12 @@ proc clientReconcile*(
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = raw_reconcile_with_ids_subrange(negentropy, cQuery.unsafeAddr, myResultPtr)
let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
let output = BufferToBytes(addr myResult.output)
let output = bufferToBytes(addr myResult.output)
var
have_hashes: seq[Buffer]
@ -313,13 +391,13 @@ proc clientReconcile*(
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, hash = hash.to0xHex()
haveIds.add(hash)
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
haves.add(hash)
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, hash = hash.to0xHex()
needIds.add(hash)
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
needs.add(hash)
trace "return ", output = output, len = output.len
@ -330,20 +408,97 @@ proc clientReconcile*(
return ok(some(NegentropyPayload(output)))
### Subrange specific methods
method delete*(self: NegentropyWithStorage) =
self.inner.free()
proc new*(
T: type SubRange,
storage: Storage,
startTime: uint64 = uint64.low,
endTime: uint64 = uint64.high,
): Result[T, string] =
let subrange = subrange_init(storage, startTime, endTime)
method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
return ok(subrange)
let ret = self.inner.raw_initiate(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err("negentropy already initiated!")
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
free_result(myResultPtr)
trace "received return from initiate", len = myResultPtr.output.len
proc delete*(subrange: SubRange) =
free(subrange)
return ok(NegentropyPayload(bytes))
method serverReconcile*(
self: NegentropyWithStorage, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
let queryBuf = toBuffer(seq[byte](query))
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
free_result(myResultPtr)
return ok(NegentropyPayload(outputBytes))
method clientReconcile*(
self: NegentropyWithStorage,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
let cQuery = toBuffer(seq[byte](query))
var myResult {.noinit.}: BindingResult = BindingResult()
myResult.have_ids_len = 0
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
let output = bufferToBytes(addr myResult.output)
var
have_hashes: seq[Buffer]
need_hashes: seq[Buffer]
if myResult.have_ids_len > 0:
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
if myResult.need_ids_len > 0:
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
trace "have and need hashes ",
have_count = have_hashes.len, need_count = need_hashes.len
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
haves.add(hash)
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
needs.add(hash)
trace "return ", output = output, len = output.len
free_result(myResultPtr)
if output.len < 1:
return ok(none(NegentropyPayload))
return ok(some(NegentropyPayload(output)))

View File

@ -24,42 +24,42 @@ type ServerSync* = object
type Reconciled*[T] = object
sync: T
negentropy: NegentropySubRange
negentropy: Negentropy
connection: Connection
frameSize: int
payload*: SyncPayload
type Sent*[T] = object
sync: T
negentropy: NegentropySubRange
negentropy: Negentropy
connection: Connection
frameSize: int
type Received*[T] = object
sync: T
negentropy: NegentropySubRange
negentropy: Negentropy
connection: Connection
frameSize: int
payload*: SyncPayload
type Completed*[T] = object
sync: T
negentropy: NegentropySubRange
negentropy: Negentropy
connection: Connection
haveHashes: seq[WakuMessageHash]
### State Transition ###
proc clientInitialize*(
store: Storage,
store: NegentropyStorage,
conn: Connection,
frameSize = DefaultFrameSize,
frameSize = DefaultMaxFrameSize,
start = int64.low,
`end` = int64.high,
): Result[Reconciled[ClientSync], string] =
let subrange = ?SubRange.new(store, uint64(start), uint64(`end`))
let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`))
let negentropy = ?NegentropySubrange.new(subrange, frameSize)
let negentropy = ?Negentropy.new(subrange, frameSize)
let negentropyPayload = ?negentropy.initiate()
@ -78,15 +78,15 @@ proc clientInitialize*(
)
proc serverInitialize*(
store: Storage,
store: NegentropyStorage,
conn: Connection,
frameSize = DefaultFrameSize,
frameSize = DefaultMaxFrameSize,
start = int64.low,
`end` = int64.high,
): Result[Sent[ServerSync], string] =
let subrange = ?SubRange.new(store, uint64(start), uint64(`end`))
let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`))
let negentropy = ?NegentropySubrange.new(subrange, frameSize)
let negentropy = ?Negentropy.new(subrange, frameSize)
let sync = ServerSync()

View File

@ -1,4 +1,5 @@
when (NimMajor, NimMinor) < (1, 4):
# Unused yet.
#[ when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
@ -72,3 +73,4 @@ proc retrieveStorage*(
self.storages[dateTime] = storage
return ok(some(storage))
]#