feat: Nwaku Sync (#2403)

* feat: Waku Sync Protocol

* feat: state machine (#2656)

* feat: pruning storage mehcanism (#2673)

* feat: message transfer mechanism & tests (#2688)

* update docker files

* added ENR filed for sync & misc. fixes

* adding new sync range param & fixes

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
This commit is contained in:
Simon-Pierre Vivier 2024-08-13 07:27:34 -04:00 committed by GitHub
parent 54b5222222
commit 2cc86c51da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 2334 additions and 8 deletions

5
.gitmodules vendored
View File

@ -169,3 +169,8 @@
url = https://github.com/nim-lang/db_connector.git
ignore = untracked
branch = master
[submodule "vendor/negentropy"]
ignore = untracked
path = vendor/negentropy
url = https://github.com/waku-org/negentropy.git
branch = master

View File

@ -46,6 +46,9 @@ RUN apk add --no-cache libgcc pcre-dev libpq-dev
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
# Fix for 'Error loading shared library libnegentropy.so: No such file or directory'
ADD ./libnegentropy.so ./
# Copy to separate location to accomodate different MAKE_TARGET values
COPY --from=nim-build /app/build/$MAKE_TARGET /usr/local/bin/

View File

@ -411,3 +411,22 @@ release-notes:
sed -E 's@#([0-9]+)@[#\1](https://github.com/waku-org/nwaku/issues/\1)@g'
# I could not get the tool to replace issue ids with links, so using sed for now,
# asked here: https://github.com/bvieira/sv4git/discussions/101
######################
### NEGENTROPY ###
######################
.PHONY: negentropy
## Pass libnegentropy to linker.
NIM_PARAMS := $(NIM_PARAMS) --passL:./libnegentropy.so
deps: | negentropy
clean: | negentropy-clean
negentropy:
$(MAKE) -C vendor/negentropy/cpp && \
cp vendor/negentropy/cpp/libnegentropy.so ./
negentropy-clean:
$(MAKE) -C vendor/negentropy/cpp clean && \
rm libnegentropy.so

View File

@ -19,6 +19,9 @@ RUN apt-get update &&\
# Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
# Fix for 'Error loading shared library libnegentropy.so: No such file or directory'
ADD ./libnegentropy.so ./
# Copy to separate location to accomodate different MAKE_TARGET values
ADD ./build/$MAKE_TARGET /usr/local/bin/

View File

@ -46,9 +46,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
flags = CapabilitiesBitfield.init(relay = true)
var enrBuilder = EnrBuilder.init(nodeKey)

View File

@ -44,9 +44,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
flags = CapabilitiesBitfield.init(relay = true)
var enrBuilder = EnrBuilder.init(nodeKey)

View File

@ -0,0 +1,188 @@
{.used.}
import std/net, testutils/unittests, chronos, libp2p/crypto/crypto
import
../../waku/
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, wakunode, testasync]
suite "Store Sync - End to End":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let timeOrigin = now()
let messages =
@[
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)),
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)),
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)),
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)),
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)),
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)),
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)),
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)),
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)),
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)),
]
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0, syncInterval = 1.hours, relayJitter = 0.seconds
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0, syncInterval = 2.milliseconds, relayJitter = 0.seconds
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
# messages are retreived when mounting Waku sync
# but based on interval so this is needed for client only
for msg in messages:
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await allFutures(server.start(), client.start())
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo()
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec)
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec)
asyncTeardown:
# prevent premature channel shutdown
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "no message set differences":
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "client message set differences":
let msg = fakeWakuMessage(@[byte 10])
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "server message set differences":
let msg = fakeWakuMessage(@[byte 10])
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
suite "Waku Sync - Pruning":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newSqliteArchiveDriver()
let clientArchiveDriver = newSqliteArchiveDriver()
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0,
relayJitter = 0.seconds,
syncRange = 1.hours,
syncInterval = 5.minutes,
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0,
syncRange = 10.milliseconds,
syncInterval = 10.milliseconds,
relayJitter = 0.seconds,
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
await allFutures(server.start(), client.start())
asyncTeardown:
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "pruning":
for _ in 0 ..< 4:
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == 10
server.wakuSync.storageSize() == 40

View File

@ -0,0 +1,37 @@
{.used.}
import std/options, chronos, chronicles, libp2p/crypto/crypto
import waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore
proc newTestWakuSync*(
switch: Switch,
transfer: Option[TransferCallback] = none(TransferCallback),
prune: Option[PruneCallback] = none(PruneCallback),
interval: Duration = DefaultSyncInterval,
): Future[WakuSync] {.async.} =
let peerManager = PeerManager.new(switch)
let fakePruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
return ok((@[], none(WakuMessageHash)))
let res = await WakuSync.new(
peerManager = peerManager,
relayJitter = 0.seconds,
syncInterval = interval,
wakuArchive = nil,
wakuStoreClient = nil,
pruneCallback = some(fakePruneCallback),
transferCallback = none(TransferCallback),
)
let proto = res.get()
proto.start()
switch.mount(proto)
return proto

View File

@ -0,0 +1,3 @@
{.used.}
import ./test_protocol, ./test_bindings

View File

@ -0,0 +1,141 @@
{.used.}
import std/[options], testutils/unittests, chronos, libp2p/crypto/crypto, std/random
import
../../waku/
[node/peer_manager, waku_core, waku_core/message/digest, waku_sync/raw_bindings],
../testlib/[wakucore],
./sync_utils
random.randomize()
#TODO clean this up
suite "Bindings":
var storage {.threadvar.}: NegentropyStorage
var messages {.threadvar.}: seq[(WakuMessageHash, WakuMessage)]
setup:
let storageRes = NegentropyStorage.new()
assert storageRes.isOk(), $storageRes.error
storage = storageRes.get()
messages = @[]
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
let hash = computeMessageHash(DefaultPubsubTopic, msg)
messages.add((hash, msg))
teardown:
storage.delete()
test "storage insert":
check:
storage.len() == 0
let insRes = storage.insert(messages[0][1].timestamp, messages[0][0])
assert insRes.isOk(), $insRes.error
check:
storage.len() == 1
test "storage erase":
let insRes = storage.insert(messages[0][1].timestamp, messages[0][0])
assert insRes.isOk(), $insRes.error
check:
storage.len() == 1
var delRes = storage.erase(messages[0][1].timestamp, messages[0][0])
assert delRes.isOk()
check:
storage.len() == 0
delRes = storage.erase(messages[0][1].timestamp, messages[0][0])
assert delRes.isErr()
check:
storage.len() == 0
test "subrange":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
check:
storage.len() == 10
let subrangeRes = NegentropySubRangeStorage.new(storage)
assert subrangeRes.isOk(), subrangeRes.error
let subrange = subrangeRes.get()
check:
subrange.len() == 10
#[ test "storage memory size":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
check:
storage.len() == 10
for (hash, msg) in messages:
let delRes = storage.erase(msg.timestamp, hash)
assert delRes.isOk(), $delRes.error
check:
storage.len() == 0
#TODO validate that the occupied memory didn't grow. ]#
test "reconcile server differences":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
let clientNegentropyRes = Negentropy.new(storage, 0)
let storageRes = NegentropyStorage.new()
assert storageRes.isOk(), $storageRes.error
let serverStorage = storageRes.get()
for (hash, msg) in messages:
let insRes = serverStorage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
# the extra msg
let msg = fakeWakuMessage()
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let insRes = serverStorage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
let serverNegentropyRes = Negentropy.new(serverStorage, 0)
assert clientNegentropyRes.isOk(), $clientNegentropyRes.error
assert serverNegentropyRes.isOk(), $serverNegentropyRes.error
let clientNegentropy = clientNegentropyRes.get()
let serverNegentropy = serverNegentropyRes.get()
let initRes = clientNegentropy.initiate()
assert initRes.isOk(), $initRes.error
let init = initRes.get()
let reconRes = serverNegentropy.serverReconcile(init)
assert reconRes.isOk(), $reconRes.error
let srecon = reconRes.get()
var
haves: seq[WakuMessageHash]
needs: seq[WakuMessageHash]
let creconRes = clientNegentropy.clientReconcile(srecon, haves, needs)
assert creconRes.isOk(), $creconRes.error
let reconOpt = creconRes.get()
check:
reconOpt.isNone()
needs[0] == hash

View File

@ -0,0 +1,374 @@
{.used.}
import
std/[options, sets],
testutils/unittests,
chronos,
chronicles,
libp2p/crypto/crypto,
stew/byteutils,
std/random
import
../../waku/[
node/peer_manager,
waku_core,
waku_core/message/digest,
waku_sync,
waku_sync/raw_bindings,
],
../testlib/[wakucore, testasync],
./sync_utils
random.randomize()
suite "Waku Sync":
var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch
var server {.threadvar.}: WakuSync
var client {.threadvar.}: WakuSync
var serverPeerInfo {.threadvar.}: Option[RemotePeerInfo]
asyncSetup:
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
server = await newTestWakuSync(serverSwitch)
client = await newTestWakuSync(clientSwitch)
serverPeerInfo = some(serverSwitch.peerInfo.toRemotePeerInfo())
asyncTeardown:
await sleepAsync(10.milliseconds)
await allFutures(server.stop(), client.stop())
await allFutures(serverSwitch.stop(), clientSwitch.stop())
suite "Protocol":
asyncTest "sync 2 nodes both empty":
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), hashes.error
check:
hashes.value[0].len == 0
asyncTest "sync 2 nodes empty client full server":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
server.messageIngress(DefaultPubsubTopic, msg1)
server.messageIngress(DefaultPubsubTopic, msg2)
server.messageIngress(DefaultPubsubTopic, msg3)
var hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), hashes.error
check:
hashes.value[0].len == 3
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value[0]
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value[0]
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value[0]
asyncTest "sync 2 nodes full client empty server":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
client.messageIngress(DefaultPubsubTopic, msg1)
client.messageIngress(DefaultPubsubTopic, msg2)
client.messageIngress(DefaultPubsubTopic, msg3)
var hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), hashes.error
check:
hashes.value[0].len == 0
asyncTest "sync 2 nodes different hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
server.messageIngress(DefaultPubsubTopic, msg1)
client.messageIngress(DefaultPubsubTopic, msg1)
server.messageIngress(DefaultPubsubTopic, msg2)
var syncRes = await client.storeSynchronization(serverPeerInfo)
check:
syncRes.isOk()
var hashes = syncRes.get()
check:
hashes[0].len == 1
hashes[0][0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
#Assuming message is fetched from peer
client.messageIngress(DefaultPubsubTopic, msg2)
syncRes = await client.storeSynchronization(serverPeerInfo)
check:
syncRes.isOk()
hashes = syncRes.get()
check:
hashes[0].len == 0
asyncTest "sync 2 nodes same hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
server.messageIngress(DefaultPubsubTopic, msg1)
client.messageIngress(DefaultPubsubTopic, msg1)
server.messageIngress(DefaultPubsubTopic, msg2)
client.messageIngress(DefaultPubsubTopic, msg2)
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value[0].len == 0
asyncTest "sync 2 nodes 100K msgs":
var i = 0
let msgCount = 100000
var diffIndex = rand(msgCount)
var diffMsg: WakuMessage
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if i != diffIndex:
client.messageIngress(DefaultPubsubTopic, msg)
else:
diffMsg = msg
server.messageIngress(DefaultPubsubTopic, msg)
i += 1
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value[0].len == 1
hashes.value[0][0] == computeMessageHash(DefaultPubsubTopic, diffMsg)
asyncTest "sync 2 nodes 100K msgs 10K diffs":
var i = 0
let msgCount = 100000
var diffCount = 10000
var diffMsgHashes: seq[WakuMessageHash]
var randIndexes: seq[int]
while i < diffCount:
let randInt = rand(msgCount)
if randInt in randIndexes:
continue
randIndexes.add(randInt)
i += 1
i = 0
var tmpDiffCnt = diffCount
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if tmpDiffCnt > 0 and i in randIndexes:
diffMsgHashes.add(computeMessageHash(DefaultPubsubTopic, msg))
tmpDiffCnt = tmpDiffCnt - 1
else:
client.messageIngress(DefaultPubsubTopic, msg)
server.messageIngress(DefaultPubsubTopic, msg)
i += 1
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value[0].len == diffCount
toHashSet(hashes.value[0]) == toHashSet(diffMsgHashes)
asyncTest "sync 3 nodes 2 client 1 server":
## Setup
let client2Switch = newTestSwitch()
await client2Switch.start()
let client2 = await newTestWakuSync(client2Switch)
let msgCount = 10000
var i = 0
while i < msgCount:
i += 1
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if i mod 2 == 0:
client2.messageIngress(DefaultPubsubTopic, msg)
else:
client.messageIngress(DefaultPubsubTopic, msg)
server.messageIngress(DefaultPubsubTopic, msg)
let fut1 = client.storeSynchronization(serverPeerInfo)
let fut2 = client2.storeSynchronization(serverPeerInfo)
waitFor allFutures(fut1, fut2)
let hashes1 = fut1.read()
let hashes2 = fut2.read()
assert hashes1.isOk(), $hashes1.error
assert hashes2.isOk(), $hashes2.error
check:
hashes1.value[0].len == int(msgCount / 2)
hashes2.value[0].len == int(msgCount / 2)
await client2.stop()
await client2Switch.stop()
asyncTest "sync 6 nodes varying sync diffs":
## Setup
let
client2Switch = newTestSwitch()
client3Switch = newTestSwitch()
client4Switch = newTestSwitch()
client5Switch = newTestSwitch()
await allFutures(
client2Switch.start(),
client3Switch.start(),
client4Switch.start(),
client5Switch.start(),
)
let
client2 = await newTestWakuSync(client2Switch)
client3 = await newTestWakuSync(client3Switch)
client4 = await newTestWakuSync(client4Switch)
client5 = await newTestWakuSync(client5Switch)
let msgCount = 100000
var i = 0
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if i < msgCount - 1:
client.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 10:
client2.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 100:
client3.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 1000:
client4.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 10000:
client5.messageIngress(DefaultPubsubTopic, msg)
server.messageIngress(DefaultPubsubTopic, msg)
i += 1
var timeBefore = getNowInNanosecondTime()
let hashes1 = await client.storeSynchronization(serverPeerInfo)
var timeAfter = getNowInNanosecondTime()
var syncTime = (timeAfter - timeBefore)
debug "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime
assert hashes1.isOk(), $hashes1.error
check:
hashes1.value[0].len == 1
timeBefore = getNowInNanosecondTime()
let hashes2 = await client2.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime
assert hashes2.isOk(), $hashes2.error
check:
hashes2.value[0].len == 10
timeBefore = getNowInNanosecondTime()
let hashes3 = await client3.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds",
msgsTotal = msgCount, diff = 100, syncTime = syncTime
assert hashes3.isOk(), $hashes3.error
check:
hashes3.value[0].len == 100
timeBefore = getNowInNanosecondTime()
let hashes4 = await client4.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds",
msgsTotal = msgCount, diff = 1000, syncTime = syncTime
assert hashes4.isOk(), $hashes4.error
check:
hashes4.value[0].len == 1000
timeBefore = getNowInNanosecondTime()
let hashes5 = await client5.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds",
msgsTotal = msgCount, diff = 10000, syncTime = syncTime
assert hashes5.isOk(), $hashes5.error
check:
hashes5.value[0].len == 10000
await allFutures(client2.stop(), client3.stop(), client4.stop(), client5.stop())
await allFutures(
client2Switch.stop(),
client3Switch.stop(),
client4Switch.stop(),
client5Switch.stop(),
)
asyncTest "sync 3 nodes cyclic":
let
node1Switch = newTestSwitch()
node2Switch = newTestSwitch()
node3Switch = newTestSwitch()
await allFutures(node1Switch.start(), node2Switch.start(), node3Switch.start())
let node1PeerInfo = some(node1Switch.peerInfo.toRemotePeerInfo())
let node2PeerInfo = some(node2Switch.peerInfo.toRemotePeerInfo())
let node3PeerInfo = some(node3Switch.peerInfo.toRemotePeerInfo())
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash1 = computeMessageHash(DefaultPubsubTopic, msg1)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash2 = computeMessageHash(DefaultPubsubTopic, msg2)
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash3 = computeMessageHash(DefaultPubsubTopic, msg3)
let
node1 = await newTestWakuSync(node1Switch)
node2 = await newTestWakuSync(node2Switch)
node3 = await newTestWakuSync(node3Switch)
node1.messageIngress(DefaultPubsubTopic, msg1)
node2.messageIngress(DefaultPubsubTopic, msg1)
node2.messageIngress(DefaultPubsubTopic, msg2)
node3.messageIngress(DefaultPubsubTopic, msg3)
let f1 = node1.storeSynchronization(node2PeerInfo)
let f2 = node2.storeSynchronization(node3PeerInfo)
let f3 = node3.storeSynchronization(node1PeerInfo)
waitFor allFutures(f1, f2, f3)
let hashes1 = f1.read()
let hashes2 = f2.read()
let hashes3 = f3.read()
assert hashes1.isOk(), hashes1.error
assert hashes2.isOk(), hashes2.error
assert hashes3.isOk(), hashes3.error
check:
hashes1.get()[0].len == 1
hashes2.get()[0].len == 1
hashes3.get()[0].len == 1
hashes1.get()[0][0] == hash2
hashes2.get()[0][0] == hash3
hashes3.get()[0][0] == hash1
await allFutures(node1.stop(), node2.stop(), node3.stop())
await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop())

1
vendor/negentropy vendored Submodule

@ -0,0 +1 @@
Subproject commit 311a21a22bdb6d80e5c4ba5e3d2f550e0062b2cb

View File

@ -385,6 +385,40 @@ type WakuNodeConf* = object
name: "store-resume"
.}: bool
## Sync config
storeSync* {.
desc: "Enable store sync protocol: true|false",
defaultValue: false,
name: "store-sync"
.}: bool
storeSyncInterval* {.
desc: "Interval between store sync attempts. In seconds.",
defaultValue: 3600, # 1 hours
name: "store-sync-interval"
.}: int64
storeSyncRange* {.
desc: "Amount of time to sync. In seconds.",
defaultValue: 300, # 5 minutes
name: "store-sync-range"
.}: int64
storeSyncRelayJitter* {.
hidden,
desc: "Time offset to account for message propagation jitter. In seconds.",
defaultValue: 20,
name: "store-sync-relay-jitter"
.}: int64
storeSyncMaxPayloadSize* {.
hidden,
desc:
"Max size in bytes of the inner negentropy payload. Cannot be less than 5K, 0 is unlimited.",
defaultValue: 0,
name: "store-sync-max-payload-size"
.}: int64
## Filter config
filter* {.
desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter"

View File

@ -142,6 +142,7 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul
filter = conf.filter,
store = conf.store,
relay = conf.relay,
sync = conf.storeSync,
)
# Resolve and use DNS domain IP

View File

@ -295,6 +295,17 @@ proc setupProtocols(
if conf.store and conf.storeResume:
node.setupStoreResume()
if conf.storeSync:
(
await node.mountWakuSync(
int(conf.storeSyncMaxPayloadSize),
conf.storeSyncRange.seconds(),
conf.storeSyncInterval.seconds(),
conf.storeSyncRelayJitter.seconds(),
)
).isOkOr:
return err("failed to mount waku sync protocol: " & $error)
# NOTE Must be mounted after relay
if conf.lightpush:
try:

View File

@ -39,6 +39,7 @@ import
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_metadata,
../waku_sync,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
@ -104,6 +105,7 @@ type
wakuPeerExchange*: WakuPeerExchange
wakuMetadata*: WakuMetadata
wakuSharding*: Sharding
wakuSync*: WakuSync
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
@ -194,6 +196,42 @@ proc connectToNodes*(
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await peer_manager.connectToNodes(node.peerManager, nodes, source = source)
## Waku Sync
proc mountWakuSync*(
node: WakuNode,
maxFrameSize: int = DefaultMaxFrameSize,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
): Future[Result[void, string]] {.async.} =
if not node.wakuSync.isNil():
return err("already mounted")
node.wakuSync = (
await WakuSync.new(
peerManager = node.peerManager,
maxFrameSize = maxFrameSize,
syncRange = syncRange,
syncInterval = syncInterval,
relayJitter = relayJitter,
wakuArchive = node.wakuArchive,
wakuStoreClient = node.wakuStoreClient,
)
).valueOr:
return err("initialization failed: " & error)
let catchable = catch:
node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec))
if catchable.isErr():
return err("switch mounting failed: " & catchable.error.msg)
if node.started:
node.wakuSync.start()
return ok()
## Waku Metadata
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
@ -258,12 +296,19 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await node.wakuArchive.handleMessage(topic, msg)
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async.} =
if node.wakuSync.isNil():
return
node.wakuSync.messageIngress(topic, msg)
let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
discard node.wakuRelay.subscribe(topic, defaultHandler)
@ -1286,6 +1331,9 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start()
if not node.wakuSync.isNil():
node.wakuSync.start()
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(

View File

@ -16,6 +16,7 @@ import
../../../waku_relay,
../../../waku_peer_exchange,
../../../waku_node,
../../../waku_sync,
../../../node/peer_manager,
../responses,
../serdes,
@ -101,6 +102,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
)
tuplesToWakuPeers(peers, pxPeers)
if not node.wakuSync.isNil():
# Map WakuSync peers to WakuPeers and add to return list
let syncPeers = node.peerManager.peerStore.peers(WakuSyncCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuSyncCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, syncPeers)
let resp = RestApiResponse.jsonResponse(peers, status = Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error = resp.error

View File

@ -33,3 +33,16 @@ template nanosecondTime*(collector: Gauge, body: untyped) =
metrics.set(collector, nowInUnixFloat() - start)
else:
body
# Unused yet. Kept for future use in Waku Sync.
#[ proc timestampInSeconds*(time: Timestamp): Timestamp =
let timeStr = $time
var timestamp: Timestamp = time
if timeStr.len() > 16:
timestamp = Timestamp(time div Timestamp(1_000_000_000))
elif timeStr.len() < 16 and timeStr.len() > 13:
timestamp = Timestamp(time div Timestamp(1_000_000))
elif timeStr.len() > 10:
timestamp = Timestamp(time div Timestamp(1000))
return timestamp ]#

View File

@ -18,8 +18,11 @@ type
Store = 1
Filter = 2
Lightpush = 3
Sync = 4
func init*(T: type CapabilitiesBitfield, lightpush, filter, store, relay: bool): T =
func init*(
T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync: bool = false
): T =
## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/)
var bitfield: uint8
if relay:
@ -30,6 +33,8 @@ func init*(T: type CapabilitiesBitfield, lightpush, filter, store, relay: bool):
bitfield.setBit(2)
if lightpush:
bitfield.setBit(3)
if sync:
bitfield.setBit(4)
CapabilitiesBitfield(bitfield)
func init*(T: type CapabilitiesBitfield, caps: varargs[Capabilities]): T =

View File

@ -48,7 +48,7 @@ proc sendStoreRequest(
return ok(res)
proc query*(
self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo
self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId
): Future[StoreQueryResult] {.async, gcsafe.} =
if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))

5
waku/waku_sync.nim Normal file
View File

@ -0,0 +1,5 @@
{.push raises: [].}
import ./waku_sync/protocol, ./waku_sync/common
export common, protocol

57
waku/waku_sync/codec.nim Normal file
View File

@ -0,0 +1,57 @@
{.push raises: [].}
import std/options, stew/arrayops
import ../common/protobuf, ../waku_core, ./common
proc encode*(req: SyncPayload): ProtoBuffer =
var pb = initProtoBuffer()
if req.syncRange.isSome():
pb.write3(31, req.syncRange.get()[0])
pb.write3(32, req.syncRange.get()[1])
if req.frameSize.isSome():
pb.write3(33, req.frameSize.get())
if req.negentropy.len > 0:
pb.write3(1, req.negentropy)
if req.hashes.len > 0:
for hash in req.hashes:
pb.write3(20, hash)
return pb
proc decode*(T: type SyncPayload, buffer: seq[byte]): ProtobufResult[T] =
var req = SyncPayload()
let pb = initProtoBuffer(buffer)
var rangeStart: uint64
var rangeEnd: uint64
if ?pb.getField(31, rangeStart) and ?pb.getField(32, rangeEnd):
req.syncRange = some((rangeStart, rangeEnd))
else:
req.syncRange = none((uint64, uint64))
var frame: uint64
if ?pb.getField(33, frame):
req.frameSize = some(frame)
else:
req.frameSize = none(uint64)
var negentropy: seq[byte]
if ?pb.getField(1, negentropy):
req.negentropy = negentropy
else:
req.negentropy = @[]
var buffer: seq[seq[byte]]
if not ?pb.getRepeatedField(20, buffer):
req.hashes = @[]
else:
req.hashes = newSeqOfCap[WakuMessageHash](buffer.len)
for buf in buffer:
let msg: WakuMessageHash = fromBytes(buf)
req.hashes.add(msg)
return ok(req)

32
waku/waku_sync/common.nim Normal file
View File

@ -0,0 +1,32 @@
{.push raises: [].}
import std/[options], chronos, libp2p/peerId
import ../waku_core
const
DefaultSyncInterval*: Duration = 5.minutes
DefaultSyncRange*: Duration = 1.hours
RetryDelay*: Duration = 30.seconds
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
DefaultMaxFrameSize* = 1048576 # 1 MiB
DefaultGossipSubJitter*: Duration = 20.seconds
type
TransferCallback* = proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.}
PruneCallback* = proc(
startTime: Timestamp, endTime: Timestamp, cursor = none(WakuMessageHash)
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.}
SyncPayload* = object
syncRange*: Option[(uint64, uint64)]
frameSize*: Option[uint64]
negentropy*: seq[byte] # negentropy protocol payload
hashes*: seq[WakuMessageHash]

520
waku/waku_sync/protocol.nim Normal file
View File

@ -0,0 +1,520 @@
{.push raises: [].}
import
std/[options, sugar, sequtils],
stew/byteutils,
results,
chronicles,
chronos,
metrics,
libp2p/utility,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/enr,
../waku_core,
../waku_archive,
../waku_store/[client, common],
../waku_enr,
../node/peer_manager/peer_manager,
./raw_bindings,
./common,
./session
logScope:
topics = "waku sync"
type WakuSync* = ref object of LPProtocol
storage: NegentropyStorage
maxFrameSize: int # Negentropy param to limit the size of payloads
peerManager: PeerManager
syncInterval: timer.Duration # Time between each syncronisation attempt
syncRange: timer.Duration # Amount of time in the past to sync
relayJitter: Duration # Amount of time since the present to ignore when syncing
transferCallBack: Option[TransferCallback] # Callback for message transfers.
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
pruneStart: Timestamp # Last pruning start timestamp
pruneOffset: timer.Duration # Offset to prune a bit more than necessary.
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
proc storageSize*(self: WakuSync): int =
self.storage.len
proc messageIngress*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
return
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
trace "inserting message into waku sync storage ",
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
self.storage.insert(msg.timestamp, msgHash).isOkOr:
error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = $error
proc messageIngress*(
self: WakuSync, pubsubTopic: PubsubTopic, msgHash: WakuMessageHash, msg: WakuMessage
) =
if msg.ephemeral:
return
trace "inserting message into waku sync storage ",
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
if self.storage.insert(msg.timestamp, msgHash).isErr():
error "failed to insert message ", msg_hash = msgHash.to0xHex()
proc calculateRange(
jitter: Duration = 20.seconds, syncRange: Duration = 1.hours
): (int64, int64) =
## Calculates the start and end time of a sync session
var now = getNowInNanosecondTime()
# Because of message jitter inherent to Relay protocol
now -= jitter.nanos
let syncRange = syncRange.nanos
let syncStart = now - syncRange
let syncEnd = now
return (syncStart, syncEnd)
proc request(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
let initialized =
?clientInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
debug "sync session initialized",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
frameSize = self.maxFrameSize,
timeStart = syncStart,
timeEnd = syncEnd
var hashes: seq[WakuMessageHash]
var reconciled = initialized
while true:
let sent = ?await reconciled.send()
trace "sync payload sent",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = reconciled.payload
let received = ?await sent.listenBack()
trace "sync payload received",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = received.payload
reconciled = (?received.clientReconcile(hashes)).valueOr:
let completed = error # Result[Reconciled, Completed]
?await completed.clientTerminate()
debug "sync session ended gracefully",
client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId
return ok(hashes)
continue
proc storeSynchronization*(
self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async.} =
let peer = peerInfo.valueOr:
self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
error "sync session ended",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
return err("Sync request error: " & error)
return ok((hashes, peer))
proc handleSyncSession(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
let initialized =
?serverInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
var sent = initialized
while true:
let received = ?await sent.listenBack()
trace "sync payload received",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = received.payload
let reconciled = (?received.serverReconcile()).valueOr:
let completed = error # Result[Reconciled, Completed]
let hashes = await completed.serverTerminate()
return ok(hashes)
sent = ?await reconciled.send()
trace "sync payload sent",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = reconciled.payload
continue
proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, closure.} =
debug "sync session requested",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
let hashes = (await self.handleSyncSession(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
#TODO send error code and desc to client
return
if hashes.len > 0 and self.transferCallBack.isSome():
let callback = self.transferCallBack.get()
(await callback(hashes, conn.peerId)).isOkOr:
error "transfer callback failed", error = $error
debug "sync session ended gracefully",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
self.handler = handle
self.codec = WakuSyncCodec
proc createPruneCallback(
self: WakuSync, wakuArchive: WakuArchive
): Result[PruneCallBack, string] =
if wakuArchive.isNil():
return err ("waku archive unavailable")
let callback: PruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
let archiveCursor =
if cursor.isSome():
some(cursor.get())
else:
none(ArchiveCursor)
let query = ArchiveQuery(
includeData: true,
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
)
let catchable = catch:
await wakuArchive.findMessages(query)
if catchable.isErr():
return err("archive error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("archive error: " & $error)
let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)
let cursor = response.cursor
return ok((elements, cursor))
return ok(callback)
proc createTransferCallback(
self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
): Result[TransferCallback, string] =
if wakuArchive.isNil():
return err("waku archive unavailable")
if wakuStoreClient.isNil():
return err("waku store client unavailable")
let callback: TransferCallback = proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.} =
var query = StoreQueryRequest()
query.includeData = true
query.messageHashes = hashes
query.paginationLimit = some(uint64(100))
while true:
let catchable = catch:
await wakuStoreClient.query(query, peerId)
if catchable.isErr():
return err("store client error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("store client error: " & $error)
query.paginationCursor = response.paginationCursor
for kv in response.messages:
let handleRes = catch:
await wakuArchive.handleMessage(kv.pubsubTopic.get(), kv.message.get())
if handleRes.isErr():
error "message transfer failed", error = handleRes.error.msg
# Messages can be synced next time since they are not added to storage yet.
continue
self.messageIngress(kv.pubsubTopic.get(), kv.messageHash, kv.message.get())
if query.paginationCursor.isNone():
break
return ok()
return ok(callback)
proc initFillStorage(
self: WakuSync, wakuArchive: WakuArchive
): Future[Result[void, string]] {.async.} =
if wakuArchive.isNil():
return err("waku archive unavailable")
let endTime = getNowInNanosecondTime()
let starTime = endTime - self.syncRange.nanos
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
)
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err($error)
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let topic = response.topics[i]
let msg = response.messages[i]
self.messageIngress(topic, hash, msg)
if response.cursor.isNone():
break
query.cursor = response.cursor
return ok()
proc new*(
T: type WakuSync,
peerManager: PeerManager,
maxFrameSize: int = DefaultMaxFrameSize,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
wakuArchive: WakuArchive,
wakuStoreClient: WakuStoreClient,
pruneCallback: Option[PruneCallback] = none(PruneCallback),
transferCallback: Option[TransferCallback] = none(TransferCallback),
): Future[Result[T, string]] {.async.} =
let storage = NegentropyStorage.new().valueOr:
return err("negentropy storage creation failed")
var sync = WakuSync(
storage: storage,
peerManager: peerManager,
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
syncRange: syncRange,
relayJitter: relayJitter,
pruneOffset: syncInterval div 100,
)
sync.initProtocolHandler()
sync.pruneCallBack = pruneCallback
if sync.pruneCallBack.isNone():
let res = sync.createPruneCallback(wakuArchive)
if res.isErr():
error "pruning callback creation error", error = res.error
else:
sync.pruneCallBack = some(res.get())
sync.transferCallBack = transferCallback
if sync.transferCallBack.isNone():
let res = sync.createTransferCallback(wakuArchive, wakuStoreClient)
if res.isErr():
error "transfer callback creation error", error = res.error
else:
sync.transferCallBack = some(res.get())
let res = await sync.initFillStorage(wakuArchive)
if res.isErr():
warn "will not sync messages before this point in time", error = res.error
info "WakuSync protocol initialized"
return ok(sync)
proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic sync started"
var
hashes: seq[WakuMessageHash]
peer: RemotePeerInfo
tries = 3
while true:
let res = (await self.storeSynchronization()).valueOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "sync failed", error = $error
break
hashes = res[0]
peer = res[1]
break
if hashes.len > 0:
tries = 3
while true:
(await callback(hashes, peer.peerId)).isOkOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "transfer callback failed", error = $error
break
break
debug "periodic sync done", hashSynced = hashes.len
continue
proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
# Default T minus 60m
self.pruneStart = getNowInNanosecondTime() - self.syncRange.nanos
await sleepAsync(self.syncInterval)
# Default T minus 55m
var pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic prune started",
startTime = self.pruneStart - self.pruneOffset.nanos,
endTime = pruneStop,
storageSize = self.storage.len
var (elements, cursor) =
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))
var tries = 3
while true:
(elements, cursor) = (
await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor)
).valueOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "pruning callback failed", error = $error
break
if elements.len == 0:
# no elements to remove, stop
break
for (hash, timestamp) in elements:
self.storage.erase(timestamp, hash).isOkOr:
error "storage erase failed",
timestamp = timestamp, msg_hash = hash.to0xHex(), error = $error
continue
if cursor.isNone():
# no more pages, stop
break
self.pruneStart = pruneStop
pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
debug "periodic prune done", storageSize = self.storage.len
continue
proc start*(self: WakuSync) =
self.started = true
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync(self.transferCallBack.get())
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get())
info "WakuSync protocol started"
proc stopWait*(self: WakuSync) {.async.} =
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
info "WakuSync protocol stopped"

View File

@ -0,0 +1,501 @@
{.push raises: [].}
from os import DirSep
import std/[strutils], chronicles, std/options, stew/byteutils, confutils, results
import ../waku_core/message
const negentropyPath =
currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep &
"vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep
{.link: negentropyPath & "libnegentropy.so".}
const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h"
logScope:
topics = "waku sync"
type Buffer = object
len*: uint64
`ptr`*: ptr uint8
type BindingResult = object
output: Buffer
have_ids_len: uint
need_ids_len: uint
have_ids: ptr Buffer
need_ids: ptr Buffer
error: cstring
proc toWakuMessageHash(buffer: Buffer): WakuMessageHash =
assert buffer.len == 32
var hash: WakuMessageHash
copyMem(hash[0].addr, buffer.ptr, 32)
return hash
proc toBuffer(x: openArray[byte]): Buffer =
## converts the input to a Buffer object
## the Buffer object is used to communicate data with the rln lib
var temp = @x
let baseAddr = cast[pointer](x)
let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len))
return output
proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] =
var bufLen: uint64
if isNone(len):
bufLen = buffer.len
else:
bufLen = len.get()
if bufLen == 0:
return @[]
trace "length of buffer is", len = bufLen
let bytes = newSeq[byte](bufLen)
copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen)
return bytes
proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] =
var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr)
var mySequence = newSeq[Buffer](buffLen)
for i in 0 .. buffLen - 1:
mySequence[i] = uncheckedArr[i]
return mySequence
### Storage ###
type NegentropyStorage* = distinct pointer
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27
proc storage_init(
db_path: cstring, name: cstring
): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41
proc raw_insert(
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43
proc raw_erase(
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29
proc free(
storage: NegentropyStorage
) {.header: NEGENTROPY_HEADER, importc: "storage_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size(
storage: NegentropyStorage
): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".}
### Negentropy ###
type RawNegentropy* = distinct pointer
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33
proc constructNegentropy(
storage: NegentropyStorage, frameSizeLimit: uint64
): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37
proc raw_initiate(
negentropy: RawNegentropy, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39
proc raw_setInitiator(
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45
proc raw_reconcile(
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51
proc raw_reconcile_with_ids(
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35
proc free(
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53
proc free_result(
r: ptr BindingResult
) {.header: NEGENTROPY_HEADER, importc: "free_result".}
### SubRange ###
type NegentropySubRangeStorage* = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57
proc subrange_init(
storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64
): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59
proc free(
subrange: NegentropySubRangeStorage
) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size(
subrange: NegentropySubRangeStorage
): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".}
### Negentropy with NegentropySubRangeStorage ###
type RawNegentropySubRange = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61
proc constructNegentropyWithSubRange(
subrange: NegentropySubRangeStorage, frameSizeLimit: uint64
): RawNegentropySubRange {.
header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new"
.}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65
proc raw_initiate_subrange(
negentropy: RawNegentropySubRange, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67
proc raw_reconcile_subrange(
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69
proc raw_reconcile_with_ids_subrange(
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63
proc free(
negentropy: RawNegentropySubRange
) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".}
### Wrappings ###
### Storage ###
proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.}
proc new*(T: type NegentropyStorage): Result[T, string] =
#TODO db name and path
let storage = storage_init("", "")
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
return ok(storage)
proc delete*(storage: NegentropyStorage) =
storage.free()
proc erase*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_erase(storage, uint64(id), bufPtr)
#TODO error handling once we move to lmdb
if res:
return ok()
else:
return err("erase error")
proc insert*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_insert(storage, uint64(id), bufPtr)
#TODO error handling once we move to lmdb
if res:
return ok()
else:
return err("insert error")
proc len*(storage: NegentropyStorage): int =
int(storage.size)
### SubRange ###
proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.}
proc new*(
T: type NegentropySubRangeStorage,
storage: NegentropyStorage,
startTime: uint64 = uint64.low,
endTime: uint64 = uint64.high,
): Result[T, string] =
let subrange = subrange_init(storage, startTime, endTime)
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
return ok(subrange)
proc delete*(subrange: NegentropySubRangeStorage) =
subrange.free()
proc len*(subrange: NegentropySubRangeStorage): int =
int(subrange.size)
### Interface ###
type
Negentropy* = ref object of RootObj
NegentropyWithSubRange = ref object of Negentropy
inner: RawNegentropySubRange
NegentropyWithStorage = ref object of Negentropy
inner: RawNegentropy
NegentropyPayload* = distinct seq[byte]
method delete*(self: Negentropy) {.base, gcsafe.} =
discard
method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} =
discard
method serverReconcile*(
self: Negentropy, query: NegentropyPayload
): Result[NegentropyPayload, string] {.base.} =
discard
method clientReconcile*(
self: Negentropy,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] {.base.} =
discard
### Impl. ###
proc new*(
T: type Negentropy,
storage: NegentropyStorage | NegentropySubRangeStorage,
frameSizeLimit: int,
): Result[T, string] =
if storage is NegentropyStorage:
let raw_negentropy =
constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit))
let negentropy = NegentropyWithStorage(inner: raw_negentropy)
return ok(negentropy)
elif storage is NegentropySubRangeStorage:
let raw_negentropy = constructNegentropyWithSubRange(
NegentropySubRangeStorage(storage), uint64(frameSizeLimit)
)
let negentropy = NegentropyWithSubRange(inner: raw_negentropy)
return ok(negentropy)
method delete*(self: NegentropyWithSubRange) =
self.inner.free()
method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_initiate_subrange(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err("negentropy already initiated!")
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
free_result(myResultPtr)
trace "received return from initiate", len = myResultPtr.output.len
return ok(NegentropyPayload(bytes))
method serverReconcile*(
self: NegentropyWithSubRange, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
let queryBuf = toBuffer(seq[byte](query))
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
free_result(myResultPtr)
return ok(NegentropyPayload(outputBytes))
method clientReconcile*(
self: NegentropyWithSubRange,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
let cQuery = toBuffer(seq[byte](query))
var myResult {.noinit.}: BindingResult = BindingResult()
myResult.have_ids_len = 0
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
let output = bufferToBytes(addr myResult.output)
var
have_hashes: seq[Buffer]
need_hashes: seq[Buffer]
if myResult.have_ids_len > 0:
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
if myResult.need_ids_len > 0:
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
trace "have and need hashes ",
have_count = have_hashes.len, need_count = need_hashes.len
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
haves.add(hash)
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
needs.add(hash)
trace "return ", output = output, len = output.len
free_result(myResultPtr)
if output.len < 1:
return ok(none(NegentropyPayload))
return ok(some(NegentropyPayload(output)))
method delete*(self: NegentropyWithStorage) =
self.inner.free()
method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_initiate(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err("negentropy already initiated!")
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
free_result(myResultPtr)
trace "received return from initiate", len = myResultPtr.output.len
return ok(NegentropyPayload(bytes))
method serverReconcile*(
self: NegentropyWithStorage, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
let queryBuf = toBuffer(seq[byte](query))
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
free_result(myResultPtr)
return ok(NegentropyPayload(outputBytes))
method clientReconcile*(
self: NegentropyWithStorage,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
let cQuery = toBuffer(seq[byte](query))
var myResult {.noinit.}: BindingResult = BindingResult()
myResult.have_ids_len = 0
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($myResultPtr.error)
let output = bufferToBytes(addr myResult.output)
var
have_hashes: seq[Buffer]
need_hashes: seq[Buffer]
if myResult.have_ids_len > 0:
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
if myResult.need_ids_len > 0:
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
trace "have and need hashes ",
have_count = have_hashes.len, need_count = need_hashes.len
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
haves.add(hash)
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
needs.add(hash)
trace "return ", output = output, len = output.len
free_result(myResultPtr)
if output.len < 1:
return ok(none(NegentropyPayload))
return ok(some(NegentropyPayload(output)))

240
waku/waku_sync/session.nim Normal file
View File

@ -0,0 +1,240 @@
{.push raises: [].}
import std/options, results, chronos, libp2p/stream/connection
import
../common/nimchronos,
../common/protobuf,
../waku_core,
./raw_bindings,
./common,
./codec
#TODO add states for protocol negotiation
### Type State ###
type ClientSync* = object
haveHashes: seq[WakuMessageHash]
type ServerSync* = object
# T is either ClientSync or ServerSync
type Reconciled*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
frameSize: int
payload*: SyncPayload
type Sent*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
frameSize: int
type Received*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
frameSize: int
payload*: SyncPayload
type Completed*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
haveHashes: seq[WakuMessageHash]
### State Transition ###
proc clientInitialize*(
store: NegentropyStorage,
conn: Connection,
frameSize = DefaultMaxFrameSize,
start = int64.low,
`end` = int64.high,
): Result[Reconciled[ClientSync], string] =
let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`))
let negentropy = ?Negentropy.new(subrange, frameSize)
let negentropyPayload = ?negentropy.initiate()
let payload = SyncPayload(negentropy: seq[byte](negentropyPayload))
let sync = ClientSync()
return ok(
Reconciled[ClientSync](
sync: sync,
negentropy: negentropy,
connection: conn,
frameSize: frameSize,
payload: payload,
)
)
proc serverInitialize*(
store: NegentropyStorage,
conn: Connection,
frameSize = DefaultMaxFrameSize,
syncStart = int64.low,
syncEnd = int64.high,
): Result[Sent[ServerSync], string] =
let subrange =
?NegentropySubRangeStorage.new(store, uint64(syncStart), uint64(syncEnd))
let negentropy = ?Negentropy.new(subrange, frameSize)
let sync = ServerSync()
return ok(
Sent[ServerSync](
sync: sync, negentropy: negentropy, connection: conn, frameSize: frameSize
)
)
proc send*[T](self: Reconciled[T]): Future[Result[Sent[T], string]] {.async.} =
let writeRes = catch:
await self.connection.writeLP(self.payload.encode().buffer)
if writeRes.isErr():
return err("send connection write error: " & writeRes.error.msg)
return ok(
Sent[T](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
)
)
proc listenBack*[T](self: Sent[T]): Future[Result[Received[T], string]] {.async.} =
let readRes = catch:
await self.connection.readLp(-1)
let buffer: seq[byte] =
if readRes.isOk():
readRes.get()
else:
return err("listenBack connection read error: " & readRes.error.msg)
# can't otherwise the compiler complains
#let payload = SyncPayload.decode(buffer).valueOr:
#return err($error)
let decodeRes = SyncPayload.decode(buffer)
let payload =
if decodeRes.isOk():
decodeRes.get()
else:
let decodeError: ProtobufError = decodeRes.error
let errMsg = $decodeError
return err("listenBack decoding error: " & errMsg)
return ok(
Received[T](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
# Aliasing for readability
type ContinueOrCompleted[T] = Result[Reconciled[T], Completed[T]]
type Continue[T] = Reconciled[T]
proc clientReconcile*(
self: Received[ClientSync], needHashes: var seq[WakuMessageHash]
): Result[ContinueOrCompleted[ClientSync], string] =
var haves = self.sync.haveHashes
let responseOpt =
?self.negentropy.clientReconcile(
NegentropyPayload(self.payload.negentropy), haves, needHashes
)
let sync = ClientSync(haveHashes: haves)
let response = responseOpt.valueOr:
let res = ContinueOrCompleted[ClientSync].err(
Completed[ClientSync](
sync: sync, negentropy: self.negentropy, connection: self.connection
)
)
return ok(res)
let payload = SyncPayload(negentropy: seq[byte](response))
let res = ContinueOrCompleted[ClientSync].ok(
Continue[ClientSync](
sync: sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
return ok(res)
proc serverReconcile*(
self: Received[ServerSync]
): Result[ContinueOrCompleted[ServerSync], string] =
if self.payload.negentropy.len == 0:
let res = ContinueOrCompleted[ServerSync].err(
Completed[ServerSync](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
haveHashes: self.payload.hashes,
)
)
return ok(res)
let response =
?self.negentropy.serverReconcile(NegentropyPayload(self.payload.negentropy))
let payload = SyncPayload(negentropy: seq[byte](response))
let res = ContinueOrCompleted[ServerSync].ok(
Continue[ServerSync](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
return ok(res)
proc clientTerminate*(
self: Completed[ClientSync]
): Future[Result[void, string]] {.async.} =
let payload = SyncPayload(hashes: self.sync.haveHashes)
let writeRes = catch:
await self.connection.writeLp(payload.encode().buffer)
if writeRes.isErr():
return err("clientTerminate connection write error: " & writeRes.error.msg)
self.negentropy.delete()
return ok()
proc serverTerminate*(
self: Completed[ServerSync]
): Future[seq[WakuMessageHash]] {.async.} =
self.negentropy.delete()
return self.haveHashes

View File

@ -0,0 +1,76 @@
# Unused yet. Kept for future use.
#[ when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[times, tables, options], chronicles, chronos, stew/results
import ./raw_bindings, ../waku_core/time
logScope:
topics = "waku sync"
type WakuSyncStorageManager* = ref object
storages: OrderedTable[string, Storage]
# Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH
maxHours: int64
proc new*(
T: type WakuSyncStorageManager,
hoursToStore: times.Duration = initDuration(minutes = 120),
): T =
return WakuSyncStorageManager(maxHours: hoursToStore.inHours)
proc getRecentStorage*(self: WakuSyncStorageManager): Result[Option[Storage], string] =
if self.storages.len() == 0:
return ok(none(Storage))
var storageToFetch: Storage
#is there a more effective way to fetch last element?
for k, storage in self.storages:
storageToFetch = storage
return ok(some(storageToFetch))
proc deleteOldestStorage*(self: WakuSyncStorageManager) =
var storageToDelete: Storage
var time: string
#is there a more effective way to fetch first element?
for k, storage in self.storages:
storageToDelete = storage
time = k
break
if self.storages.pop(time, storageToDelete):
delete(storageToDelete)
proc retrieveStorage*(
self: WakuSyncStorageManager, time: Timestamp
): Result[Option[Storage], string] =
var timestamp: Timestamp
if time == 0:
timestamp = timestampInSeconds(getNowInNanosecondTime())
debug "timestamp not provided, using now to fetch storage", timestamp = timestamp
else:
timestamp = timestampInSeconds(time)
let tsTime = times.fromUnix(timestamp)
let dateTime = times.format(tsTime, "yyyyMMddHH", utc())
var storage: Storage = self.storages.getOrDefault(dateTime)
if storage == nil:
#create a new storage
# TODO: May need synchronization??
# Limit number of storages to configured duration
let hours = self.storages.len()
if hours == self.maxHours:
#Need to delete oldest storage at this point, but what if that is being synced?
self.deleteOldestStorage()
info "number of storages reached, deleting the oldest"
info "creating a new storage for ", time = dateTime
storage = Storage.new().valueOr:
error "storage creation failed"
return err(error)
self.storages[dateTime] = storage
return ok(some(storage))
]#