feat: remove Waku Sync 1.0 & Negentropy (#3185)

This commit is contained in:
Simon-Pierre Vivier 2024-11-29 09:09:41 -05:00 committed by GitHub
parent a7264d68c1
commit 2ab9c3d363
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 0 additions and 2086 deletions

5
.gitmodules vendored
View File

@ -169,11 +169,6 @@
url = https://github.com/nim-lang/db_connector.git
ignore = untracked
branch = master
[submodule "vendor/negentropy"]
ignore = untracked
path = vendor/negentropy
url = https://github.com/waku-org/negentropy.git
branch = master
[submodule "vendor/nph"]
ignore = untracked
branch = master

View File

@ -178,35 +178,6 @@ clean-librln:
# Extend clean target
clean: | clean-librln
######################
### NEGENTROPY ###
######################
.PHONY: negentropy
LIBNEGENTROPY_BUILDDIR := $(CURDIR)/vendor/negentropy/cpp
LIBNEGENTROPY_FILE := libnegentropy.a
deps: | negentropy
clean: | negentropy-clean
$(LIBNEGENTROPY_FILE):
$(MAKE) -C $(LIBNEGENTROPY_BUILDDIR) && \
cp $(LIBNEGENTROPY_BUILDDIR)/${LIBNEGENTROPY_FILE} ${LIBNEGENTROPY_FILE}
negentropy: | $(LIBNEGENTROPY_FILE)
## Pass libnegentropy and it's deps to linker.
$(eval LIBNEGENTROPY_PATH := $(shell if [ -f "$(LIBNEGENTROPY_FILE)" ]; then echo "$(LIBNEGENTROPY_FILE)"; else echo "./$(LIBNEGENTROPY_FILE)"; fi))
$(eval NIM_PARAMS += --passL:$(LIBNEGENTROPY_PATH) --passL:-lcrypto --passL:-lssl --passL:-lstdc++)
ifeq ($(detected_OS),Darwin)
$(eval NIM_PARAMS += --passL:-L/opt/homebrew/lib/)
endif
negentropy-clean:
$(MAKE) -C $(LIBNEGENTROPY_BUILDDIR) clean && \
rm ${LIBNEGENTROPY_FILE}
#################
## Waku Common ##
#################
@ -480,7 +451,6 @@ cwaku_example: | build libwaku
./examples/cbindings/base64.c \
-lwaku -Lbuild/ \
-pthread -ldl -lm \
-lnegentropy -Lvendor/negentropy/cpp/ \
-lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \
-lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \
vendor/nim-libbacktrace/libbacktrace_wrapper.o \
@ -493,7 +463,6 @@ cppwaku_example: | build libwaku
./examples/cpp/base64.cpp \
-lwaku -Lbuild/ \
-pthread -ldl -lm \
-lnegentropy -Lvendor/negentropy/cpp/ \
-lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \
-lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \
vendor/nim-libbacktrace/libbacktrace_wrapper.o \

View File

@ -1,37 +0,0 @@
{.used.}
import std/options, chronos, chronicles, libp2p/crypto/crypto
import waku/[node/peer_manager, waku_core, waku_sync], ../testlib/wakucore
proc newTestWakuSync*(
switch: Switch,
transfer: Option[TransferCallback] = none(TransferCallback),
prune: Option[PruneCallback] = none(PruneCallback),
interval: Duration = DefaultSyncInterval,
): Future[WakuSync] {.async.} =
let peerManager = PeerManager.new(switch)
let fakePruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
return ok((@[], none(WakuMessageHash)))
let res = await WakuSync.new(
peerManager = peerManager,
relayJitter = 0.seconds,
syncInterval = interval,
wakuArchive = nil,
wakuStoreClient = nil,
pruneCallback = some(fakePruneCallback),
transferCallback = none(TransferCallback),
)
let proto = res.get()
proto.start()
switch.mount(proto)
return proto

View File

@ -1,3 +0,0 @@
{.used.}
import ./test_protocol, ./test_bindings

View File

@ -1,141 +0,0 @@
{.used.}
import std/[options], testutils/unittests, chronos, libp2p/crypto/crypto, std/random
import
../../waku/
[node/peer_manager, waku_core, waku_core/message/digest, waku_sync/raw_bindings],
../testlib/[wakucore],
./sync_utils
random.randomize()
#TODO clean this up
suite "Bindings":
var storage {.threadvar.}: NegentropyStorage
var messages {.threadvar.}: seq[(WakuMessageHash, WakuMessage)]
setup:
let storageRes = NegentropyStorage.new()
assert storageRes.isOk(), $storageRes.error
storage = storageRes.get()
messages = @[]
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
let hash = computeMessageHash(DefaultPubsubTopic, msg)
messages.add((hash, msg))
teardown:
storage.delete()
test "storage insert":
check:
storage.len() == 0
let insRes = storage.insert(messages[0][1].timestamp, messages[0][0])
assert insRes.isOk(), $insRes.error
check:
storage.len() == 1
test "storage erase":
let insRes = storage.insert(messages[0][1].timestamp, messages[0][0])
assert insRes.isOk(), $insRes.error
check:
storage.len() == 1
var delRes = storage.erase(messages[0][1].timestamp, messages[0][0])
assert delRes.isOk()
check:
storage.len() == 0
delRes = storage.erase(messages[0][1].timestamp, messages[0][0])
assert delRes.isErr()
check:
storage.len() == 0
test "subrange":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
check:
storage.len() == 10
let subrangeRes = NegentropySubRangeStorage.new(storage)
assert subrangeRes.isOk(), subrangeRes.error
let subrange = subrangeRes.get()
check:
subrange.len() == 10
#[ test "storage memory size":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
check:
storage.len() == 10
for (hash, msg) in messages:
let delRes = storage.erase(msg.timestamp, hash)
assert delRes.isOk(), $delRes.error
check:
storage.len() == 0
#TODO validate that the occupied memory didn't grow. ]#
test "reconcile server differences":
for (hash, msg) in messages:
let insRes = storage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
let clientNegentropyRes = Negentropy.new(storage, 0)
let storageRes = NegentropyStorage.new()
assert storageRes.isOk(), $storageRes.error
let serverStorage = storageRes.get()
for (hash, msg) in messages:
let insRes = serverStorage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
# the extra msg
let msg = fakeWakuMessage()
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let insRes = serverStorage.insert(msg.timestamp, hash)
assert insRes.isOk(), $insRes.error
let serverNegentropyRes = Negentropy.new(serverStorage, 0)
assert clientNegentropyRes.isOk(), $clientNegentropyRes.error
assert serverNegentropyRes.isOk(), $serverNegentropyRes.error
let clientNegentropy = clientNegentropyRes.get()
let serverNegentropy = serverNegentropyRes.get()
let initRes = clientNegentropy.initiate()
assert initRes.isOk(), $initRes.error
let init = initRes.get()
let reconRes = serverNegentropy.serverReconcile(init)
assert reconRes.isOk(), $reconRes.error
let srecon = reconRes.get()
var
haves: seq[WakuMessageHash]
needs: seq[WakuMessageHash]
let creconRes = clientNegentropy.clientReconcile(srecon, haves, needs)
assert creconRes.isOk(), $creconRes.error
let reconOpt = creconRes.get()
check:
reconOpt.isNone()
needs[0] == hash

View File

@ -1,374 +0,0 @@
{.used.}
import
std/[options, sets],
testutils/unittests,
chronos,
chronicles,
libp2p/crypto/crypto,
stew/byteutils,
std/random
import
../../waku/[
node/peer_manager,
waku_core,
waku_core/message/digest,
waku_sync,
waku_sync/raw_bindings,
],
../testlib/[wakucore, testasync],
./sync_utils
random.randomize()
suite "Waku Sync":
var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch
var server {.threadvar.}: WakuSync
var client {.threadvar.}: WakuSync
var serverPeerInfo {.threadvar.}: Option[RemotePeerInfo]
asyncSetup:
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
server = await newTestWakuSync(serverSwitch)
client = await newTestWakuSync(clientSwitch)
serverPeerInfo = some(serverSwitch.peerInfo.toRemotePeerInfo())
asyncTeardown:
await sleepAsync(10.milliseconds)
await allFutures(server.stop(), client.stop())
await allFutures(serverSwitch.stop(), clientSwitch.stop())
suite "Protocol":
asyncTest "sync 2 nodes both empty":
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), hashes.error
check:
hashes.value[0].len == 0
asyncTest "sync 2 nodes empty client full server":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
server.messageIngress(DefaultPubsubTopic, msg1)
server.messageIngress(DefaultPubsubTopic, msg2)
server.messageIngress(DefaultPubsubTopic, msg3)
var hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), hashes.error
check:
hashes.value[0].len == 3
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) in hashes.value[0]
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) in hashes.value[0]
computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) in hashes.value[0]
asyncTest "sync 2 nodes full client empty server":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
client.messageIngress(DefaultPubsubTopic, msg1)
client.messageIngress(DefaultPubsubTopic, msg2)
client.messageIngress(DefaultPubsubTopic, msg3)
var hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), hashes.error
check:
hashes.value[0].len == 0
asyncTest "sync 2 nodes different hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
server.messageIngress(DefaultPubsubTopic, msg1)
client.messageIngress(DefaultPubsubTopic, msg1)
server.messageIngress(DefaultPubsubTopic, msg2)
var syncRes = await client.storeSynchronization(serverPeerInfo)
check:
syncRes.isOk()
var hashes = syncRes.get()
check:
hashes[0].len == 1
hashes[0][0] == computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2)
#Assuming message is fetched from peer
client.messageIngress(DefaultPubsubTopic, msg2)
syncRes = await client.storeSynchronization(serverPeerInfo)
check:
syncRes.isOk()
hashes = syncRes.get()
check:
hashes[0].len == 0
asyncTest "sync 2 nodes same hashes":
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
server.messageIngress(DefaultPubsubTopic, msg1)
client.messageIngress(DefaultPubsubTopic, msg1)
server.messageIngress(DefaultPubsubTopic, msg2)
client.messageIngress(DefaultPubsubTopic, msg2)
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value[0].len == 0
asyncTest "sync 2 nodes 100K msgs":
var i = 0
let msgCount = 100000
var diffIndex = rand(msgCount)
var diffMsg: WakuMessage
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if i != diffIndex:
client.messageIngress(DefaultPubsubTopic, msg)
else:
diffMsg = msg
server.messageIngress(DefaultPubsubTopic, msg)
i += 1
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value[0].len == 1
hashes.value[0][0] == computeMessageHash(DefaultPubsubTopic, diffMsg)
asyncTest "sync 2 nodes 100K msgs 10K diffs":
var i = 0
let msgCount = 100000
var diffCount = 10000
var diffMsgHashes: seq[WakuMessageHash]
var randIndexes: seq[int]
while i < diffCount:
let randInt = rand(msgCount)
if randInt in randIndexes:
continue
randIndexes.add(randInt)
i += 1
i = 0
var tmpDiffCnt = diffCount
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if tmpDiffCnt > 0 and i in randIndexes:
diffMsgHashes.add(computeMessageHash(DefaultPubsubTopic, msg))
tmpDiffCnt = tmpDiffCnt - 1
else:
client.messageIngress(DefaultPubsubTopic, msg)
server.messageIngress(DefaultPubsubTopic, msg)
i += 1
let hashes = await client.storeSynchronization(serverPeerInfo)
assert hashes.isOk(), $hashes.error
check:
hashes.value[0].len == diffCount
toHashSet(hashes.value[0]) == toHashSet(diffMsgHashes)
asyncTest "sync 3 nodes 2 client 1 server":
## Setup
let client2Switch = newTestSwitch()
await client2Switch.start()
let client2 = await newTestWakuSync(client2Switch)
let msgCount = 10000
var i = 0
while i < msgCount:
i += 1
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if i mod 2 == 0:
client2.messageIngress(DefaultPubsubTopic, msg)
else:
client.messageIngress(DefaultPubsubTopic, msg)
server.messageIngress(DefaultPubsubTopic, msg)
let fut1 = client.storeSynchronization(serverPeerInfo)
let fut2 = client2.storeSynchronization(serverPeerInfo)
waitFor allFutures(fut1, fut2)
let hashes1 = fut1.read()
let hashes2 = fut2.read()
assert hashes1.isOk(), $hashes1.error
assert hashes2.isOk(), $hashes2.error
check:
hashes1.value[0].len == int(msgCount / 2)
hashes2.value[0].len == int(msgCount / 2)
await client2.stop()
await client2Switch.stop()
asyncTest "sync 6 nodes varying sync diffs":
## Setup
let
client2Switch = newTestSwitch()
client3Switch = newTestSwitch()
client4Switch = newTestSwitch()
client5Switch = newTestSwitch()
await allFutures(
client2Switch.start(),
client3Switch.start(),
client4Switch.start(),
client5Switch.start(),
)
let
client2 = await newTestWakuSync(client2Switch)
client3 = await newTestWakuSync(client3Switch)
client4 = await newTestWakuSync(client4Switch)
client5 = await newTestWakuSync(client5Switch)
let msgCount = 100000
var i = 0
while i < msgCount:
let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
if i < msgCount - 1:
client.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 10:
client2.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 100:
client3.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 1000:
client4.messageIngress(DefaultPubsubTopic, msg)
if i < msgCount - 10000:
client5.messageIngress(DefaultPubsubTopic, msg)
server.messageIngress(DefaultPubsubTopic, msg)
i += 1
var timeBefore = getNowInNanosecondTime()
let hashes1 = await client.storeSynchronization(serverPeerInfo)
var timeAfter = getNowInNanosecondTime()
var syncTime = (timeAfter - timeBefore)
debug "sync time in seconds", msgsTotal = msgCount, diff = 1, syncTime = syncTime
assert hashes1.isOk(), $hashes1.error
check:
hashes1.value[0].len == 1
timeBefore = getNowInNanosecondTime()
let hashes2 = await client2.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds", msgsTotal = msgCount, diff = 10, syncTime = syncTime
assert hashes2.isOk(), $hashes2.error
check:
hashes2.value[0].len == 10
timeBefore = getNowInNanosecondTime()
let hashes3 = await client3.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds",
msgsTotal = msgCount, diff = 100, syncTime = syncTime
assert hashes3.isOk(), $hashes3.error
check:
hashes3.value[0].len == 100
timeBefore = getNowInNanosecondTime()
let hashes4 = await client4.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds",
msgsTotal = msgCount, diff = 1000, syncTime = syncTime
assert hashes4.isOk(), $hashes4.error
check:
hashes4.value[0].len == 1000
timeBefore = getNowInNanosecondTime()
let hashes5 = await client5.storeSynchronization(serverPeerInfo)
timeAfter = getNowInNanosecondTime()
syncTime = (timeAfter - timeBefore)
debug "sync time in seconds",
msgsTotal = msgCount, diff = 10000, syncTime = syncTime
assert hashes5.isOk(), $hashes5.error
check:
hashes5.value[0].len == 10000
await allFutures(client2.stop(), client3.stop(), client4.stop(), client5.stop())
await allFutures(
client2Switch.stop(),
client3Switch.stop(),
client4Switch.stop(),
client5Switch.stop(),
)
asyncTest "sync 3 nodes cyclic":
let
node1Switch = newTestSwitch()
node2Switch = newTestSwitch()
node3Switch = newTestSwitch()
await allFutures(node1Switch.start(), node2Switch.start(), node3Switch.start())
let node1PeerInfo = some(node1Switch.peerInfo.toRemotePeerInfo())
let node2PeerInfo = some(node2Switch.peerInfo.toRemotePeerInfo())
let node3PeerInfo = some(node3Switch.peerInfo.toRemotePeerInfo())
let msg1 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash1 = computeMessageHash(DefaultPubsubTopic, msg1)
let msg2 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash2 = computeMessageHash(DefaultPubsubTopic, msg2)
let msg3 = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash3 = computeMessageHash(DefaultPubsubTopic, msg3)
let
node1 = await newTestWakuSync(node1Switch)
node2 = await newTestWakuSync(node2Switch)
node3 = await newTestWakuSync(node3Switch)
node1.messageIngress(DefaultPubsubTopic, msg1)
node2.messageIngress(DefaultPubsubTopic, msg1)
node2.messageIngress(DefaultPubsubTopic, msg2)
node3.messageIngress(DefaultPubsubTopic, msg3)
let f1 = node1.storeSynchronization(node2PeerInfo)
let f2 = node2.storeSynchronization(node3PeerInfo)
let f3 = node3.storeSynchronization(node1PeerInfo)
waitFor allFutures(f1, f2, f3)
let hashes1 = f1.read()
let hashes2 = f2.read()
let hashes3 = f3.read()
assert hashes1.isOk(), hashes1.error
assert hashes2.isOk(), hashes2.error
assert hashes3.isOk(), hashes3.error
check:
hashes1.get()[0].len == 1
hashes2.get()[0].len == 1
hashes3.get()[0].len == 1
hashes1.get()[0][0] == hash2
hashes2.get()[0][0] == hash3
hashes3.get()[0][0] == hash1
await allFutures(node1.stop(), node2.stop(), node3.stop())
await allFutures(node1Switch.stop(), node2Switch.stop(), node3Switch.stop())

1
vendor/negentropy vendored

@ -1 +0,0 @@
Subproject commit 449b304d67e9401bd96451cb349eb6b17b713f67

View File

@ -313,17 +313,6 @@ proc setupProtocols(
if conf.store and conf.storeResume:
node.setupStoreResume()
if conf.storeSync:
(
await node.mountWakuSync(
int(conf.storeSyncMaxPayloadSize),
conf.storeSyncRange.seconds(),
conf.storeSyncInterval.seconds(),
conf.storeSyncRelayJitter.seconds(),
)
).isOkOr:
return err("failed to mount waku sync protocol: " & $error)
# NOTE Must be mounted after relay
if conf.lightpush:
try:

View File

@ -39,7 +39,6 @@ import
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_metadata,
../waku_sync,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
@ -108,7 +107,6 @@ type
wakuPeerExchange*: WakuPeerExchange
wakuMetadata*: WakuMetadata
wakuSharding*: Sharding
wakuSync*: WakuSync
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
@ -177,42 +175,6 @@ proc connectToNodes*(
proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} =
await peer_manager.disconnectNode(node.peerManager, remotePeer)
## Waku Sync
proc mountWakuSync*(
node: WakuNode,
maxFrameSize: int = DefaultMaxFrameSize,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
): Future[Result[void, string]] {.async.} =
if not node.wakuSync.isNil():
return err("already mounted")
node.wakuSync = (
await WakuSync.new(
peerManager = node.peerManager,
maxFrameSize = maxFrameSize,
syncRange = syncRange,
syncInterval = syncInterval,
relayJitter = relayJitter,
wakuArchive = node.wakuArchive,
wakuStoreClient = node.wakuStoreClient,
)
).valueOr:
return err("initialization failed: " & error)
let catchable = catch:
node.switch.mount(node.wakuSync, protocolMatcher(WakuSyncCodec))
if catchable.isErr():
return err("switch mounting failed: " & catchable.error.msg)
if node.started:
node.wakuSync.start()
return ok()
## Waku Metadata
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
@ -269,19 +231,12 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await node.wakuArchive.handleMessage(topic, msg)
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async.} =
if node.wakuSync.isNil():
return
node.wakuSync.messageIngress(topic, msg)
let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
discard node.wakuRelay.subscribe(topic, defaultHandler)
@ -1349,9 +1304,6 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start()
if not node.wakuSync.isNil():
node.wakuSync.start()
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(

View File

@ -16,7 +16,6 @@ import
../../../waku_relay,
../../../waku_peer_exchange,
../../../waku_node,
../../../waku_sync,
../../../node/peer_manager,
../responses,
../serdes,
@ -106,18 +105,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
)
tuplesToWakuPeers(peers, pxPeers)
if not node.wakuSync.isNil():
# Map WakuSync peers to WakuPeers and add to return list
let syncPeers = node.peerManager.wakuPeerStore.peers(WakuSyncCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuSyncCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, syncPeers)
let resp = RestApiResponse.jsonResponse(peers, status = Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error = resp.error

View File

@ -1,5 +0,0 @@
{.push raises: [].}
import ./waku_sync/protocol, ./waku_sync/common
export common, protocol

View File

@ -1,57 +0,0 @@
{.push raises: [].}
import std/options, stew/arrayops
import ../common/protobuf, ../waku_core, ./common
proc encode*(req: SyncPayload): ProtoBuffer =
var pb = initProtoBuffer()
if req.syncRange.isSome():
pb.write3(31, req.syncRange.get()[0])
pb.write3(32, req.syncRange.get()[1])
if req.frameSize.isSome():
pb.write3(33, req.frameSize.get())
if req.negentropy.len > 0:
pb.write3(1, req.negentropy)
if req.hashes.len > 0:
for hash in req.hashes:
pb.write3(20, hash)
return pb
proc decode*(T: type SyncPayload, buffer: seq[byte]): ProtobufResult[T] =
var req = SyncPayload()
let pb = initProtoBuffer(buffer)
var rangeStart: uint64
var rangeEnd: uint64
if ?pb.getField(31, rangeStart) and ?pb.getField(32, rangeEnd):
req.syncRange = some((rangeStart, rangeEnd))
else:
req.syncRange = none((uint64, uint64))
var frame: uint64
if ?pb.getField(33, frame):
req.frameSize = some(frame)
else:
req.frameSize = none(uint64)
var negentropy: seq[byte]
if ?pb.getField(1, negentropy):
req.negentropy = negentropy
else:
req.negentropy = @[]
var buffer: seq[seq[byte]]
if not ?pb.getRepeatedField(20, buffer):
req.hashes = @[]
else:
req.hashes = newSeqOfCap[WakuMessageHash](buffer.len)
for buf in buffer:
let msg: WakuMessageHash = fromBytes(buf)
req.hashes.add(msg)
return ok(req)

View File

@ -1,531 +0,0 @@
{.push raises: [].}
import
std/[options, sugar, sequtils],
stew/byteutils,
results,
chronicles,
chronos,
metrics,
libp2p/utility,
libp2p/protocols/protocol,
libp2p/stream/connection,
libp2p/crypto/crypto,
eth/p2p/discoveryv5/enr
import
../common/nimchronos,
../common/enr,
../waku_core,
../waku_archive,
../waku_store/[client, common],
../waku_enr,
../node/peer_manager/peer_manager,
./raw_bindings,
./common,
./session
logScope:
topics = "waku sync"
type WakuSync* = ref object of LPProtocol
storage: NegentropyStorage
maxFrameSize: int # Negentropy param to limit the size of payloads
peerManager: PeerManager
syncInterval: timer.Duration # Time between each syncronisation attempt
syncRange: timer.Duration # Amount of time in the past to sync
relayJitter: Duration # Amount of time since the present to ignore when syncing
transferCallBack: Option[TransferCallback] # Callback for message transfers.
pruneCallBack: Option[PruneCallBack] # Callback with the result of the archive query
pruneStart: Timestamp # Last pruning start timestamp
pruneOffset: timer.Duration # Offset to prune a bit more than necessary.
periodicSyncFut: Future[void]
periodicPruneFut: Future[void]
proc storageSize*(self: WakuSync): int =
self.storage.len
proc messageIngress*(self: WakuSync, pubsubTopic: PubsubTopic, msg: WakuMessage) =
if msg.ephemeral:
return
let msgHash: WakuMessageHash = computeMessageHash(pubsubTopic, msg)
trace "inserting message into waku sync storage ",
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
self.storage.insert(msg.timestamp, msgHash).isOkOr:
error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = $error
proc messageIngress*(
self: WakuSync, pubsubTopic: PubsubTopic, msgHash: WakuMessageHash, msg: WakuMessage
) =
if msg.ephemeral:
return
trace "inserting message into waku sync storage ",
msg_hash = msgHash.to0xHex(), timestamp = msg.timestamp
self.storage.insert(msg.timestamp, msgHash).isOkOr:
error "failed to insert message ", msg_hash = msgHash.to0xHex(), error = error
proc calculateRange(
jitter: Duration = 20.seconds, syncRange: Duration = 1.hours
): (int64, int64) =
## Calculates the start and end time of a sync session
var now = getNowInNanosecondTime()
# Because of message jitter inherent to Relay protocol
now -= jitter.nanos
let syncRange = syncRange.nanos
let syncStart = now - syncRange
let syncEnd = now
return (syncStart, syncEnd)
proc request(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
let initialized =
?clientInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
debug "sync session initialized",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
frameSize = self.maxFrameSize,
timeStart = syncStart,
timeEnd = syncEnd
var hashes: seq[WakuMessageHash]
var reconciled = initialized
while true:
let sent = ?await reconciled.send()
trace "sync payload sent",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = reconciled.payload
let received = ?await sent.listenBack()
trace "sync payload received",
client = self.peerManager.switch.peerInfo.peerId,
server = conn.peerId,
payload = received.payload
reconciled = (?received.clientReconcile(hashes)).valueOr:
let completed = error # Result[Reconciled, Completed]
?await completed.clientTerminate()
debug "sync session ended gracefully",
client = self.peerManager.switch.peerInfo.peerId, server = conn.peerId
trace "hashes to sync",
client = self.peerManager.switch.peerInfo.peerId, msg_hashes = $hashes
return ok(hashes)
continue
proc storeSynchronization*(
self: WakuSync, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo)
): Future[Result[(seq[WakuMessageHash], RemotePeerInfo), string]] {.async.} =
let peer = peerInfo.valueOr:
self.peerManager.selectPeer(WakuSyncCodec).valueOr:
return err("No suitable peer found for sync")
let connOpt = await self.peerManager.dialPeer(peer, WakuSyncCodec)
let conn: Connection = connOpt.valueOr:
return err("Cannot establish sync connection")
let hashes: seq[WakuMessageHash] = (await self.request(conn)).valueOr:
error "sync session ended",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
return err("Sync request error: " & error)
return ok((hashes, peer))
proc handleSyncSession(
self: WakuSync, conn: Connection
): Future[Result[seq[WakuMessageHash], string]] {.async.} =
let (syncStart, syncEnd) = calculateRange(self.relayJitter)
let initialized =
?serverInitialize(self.storage, conn, self.maxFrameSize, syncStart, syncEnd)
var sent = initialized
while true:
let received = ?await sent.listenBack()
trace "sync payload received",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = received.payload
let reconciled = (?received.serverReconcile()).valueOr:
let completed = error # Result[Reconciled, Completed]
let hashes = await completed.serverTerminate()
trace "hashes to sync",
server = self.peerManager.switch.peerInfo.peerId, msg_hashes = $hashes
return ok(hashes)
sent = ?await reconciled.send()
trace "sync payload sent",
server = self.peerManager.switch.peerInfo.peerId,
client = conn.peerId,
payload = reconciled.payload
continue
proc initProtocolHandler(self: WakuSync) =
proc handle(conn: Connection, proto: string) {.async, closure.} =
debug "sync session requested",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
let hashes = (await self.handleSyncSession(conn)).valueOr:
debug "sync session ended",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId, error
#TODO send error code and desc to client
return
if hashes.len > 0 and self.transferCallBack.isSome():
let callback = self.transferCallBack.get()
(await callback(hashes, conn.peerId)).isOkOr:
error "transfer callback failed", error = $error
debug "sync session ended gracefully",
server = self.peerManager.switch.peerInfo.peerId, client = conn.peerId
self.handler = handle
self.codec = WakuSyncCodec
proc createPruneCallback(
self: WakuSync, wakuArchive: WakuArchive
): Result[PruneCallBack, string] =
if wakuArchive.isNil():
return err ("waku archive unavailable")
let callback: PruneCallback = proc(
pruneStart: Timestamp, pruneStop: Timestamp, cursor: Option[WakuMessageHash]
): Future[
Result[(seq[(WakuMessageHash, Timestamp)], Option[WakuMessageHash]), string]
] {.async: (raises: []), closure.} =
let archiveCursor =
if cursor.isSome():
some(cursor.get())
else:
none(ArchiveCursor)
let query = ArchiveQuery(
includeData: true,
cursor: archiveCursor,
startTime: some(pruneStart),
endTime: some(pruneStop),
pageSize: 100,
)
let catchable = catch:
await wakuArchive.findMessages(query)
if catchable.isErr():
return err("archive error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("archive error: " & $error)
let elements = collect(newSeq):
for (hash, msg) in response.hashes.zip(response.messages):
(hash, msg.timestamp)
let cursor = response.cursor
return ok((elements, cursor))
return ok(callback)
proc createTransferCallback(
self: WakuSync, wakuArchive: WakuArchive, wakuStoreClient: WakuStoreClient
): Result[TransferCallback, string] =
if wakuArchive.isNil():
return err("waku archive unavailable")
if wakuStoreClient.isNil():
return err("waku store client unavailable")
let callback: TransferCallback = proc(
hashes: seq[WakuMessageHash], peerId: PeerId
): Future[Result[void, string]] {.async: (raises: []), closure.} =
var query = StoreQueryRequest()
query.includeData = true
query.messageHashes = hashes
query.paginationLimit = some(uint64(100))
while true:
let catchable = catch:
await wakuStoreClient.query(query, peerId)
if catchable.isErr():
return err("store client error: " & catchable.error.msg)
let res = catchable.get()
let response = res.valueOr:
return err("store client error: " & $error)
query.paginationCursor = response.paginationCursor
for kv in response.messages:
let handleRes = catch:
await wakuArchive.syncMessageIngress(
kv.messageHash, kv.pubsubTopic.get(), kv.message.get()
)
if handleRes.isErr():
error "message transfer failed", error = handleRes.error.msg
# Messages can be synced next time since they are not added to storage yet.
continue
self.messageIngress(kv.pubsubTopic.get(), kv.messageHash, kv.message.get())
if query.paginationCursor.isNone():
break
return ok()
return ok(callback)
proc initFillStorage(
self: WakuSync, wakuArchive: WakuArchive
): Future[Result[void, string]] {.async.} =
if wakuArchive.isNil():
return err("waku archive unavailable")
let endTime = getNowInNanosecondTime()
let starTime = endTime - self.syncRange.nanos
var query = ArchiveQuery(
includeData: true,
cursor: none(ArchiveCursor),
startTime: some(starTime),
endTime: some(endTime),
pageSize: 100,
)
while true:
let response = (await wakuArchive.findMessages(query)).valueOr:
return err($error)
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]
let topic = response.topics[i]
let msg = response.messages[i]
self.messageIngress(topic, hash, msg)
if response.cursor.isNone():
break
query.cursor = response.cursor
return ok()
proc new*(
T: type WakuSync,
peerManager: PeerManager,
maxFrameSize: int = DefaultMaxFrameSize,
syncRange: timer.Duration = DefaultSyncRange,
syncInterval: timer.Duration = DefaultSyncInterval,
relayJitter: Duration = DefaultGossipSubJitter,
wakuArchive: WakuArchive,
wakuStoreClient: WakuStoreClient,
pruneCallback: Option[PruneCallback] = none(PruneCallback),
transferCallback: Option[TransferCallback] = none(TransferCallback),
): Future[Result[T, string]] {.async.} =
let storage = NegentropyStorage.new().valueOr:
return err("negentropy storage creation failed")
var sync = WakuSync(
storage: storage,
peerManager: peerManager,
maxFrameSize: maxFrameSize,
syncInterval: syncInterval,
syncRange: syncRange,
relayJitter: relayJitter,
pruneOffset: syncInterval div 10, # 10% offset
)
sync.initProtocolHandler()
sync.pruneCallBack = pruneCallback
if sync.pruneCallBack.isNone():
let res = sync.createPruneCallback(wakuArchive)
if res.isErr():
error "pruning callback creation error", error = res.error
else:
sync.pruneCallBack = some(res.get())
sync.transferCallBack = transferCallback
if sync.transferCallBack.isNone():
let res = sync.createTransferCallback(wakuArchive, wakuStoreClient)
if res.isErr():
error "transfer callback creation error", error = res.error
else:
sync.transferCallBack = some(res.get())
let res = await sync.initFillStorage(wakuArchive)
if res.isErr():
warn "will not sync messages before this point in time", error = res.error
info "WakuSync protocol initialized"
return ok(sync)
proc periodicSync(self: WakuSync, callback: TransferCallback) {.async.} =
debug "periodic sync initialized", interval = $self.syncInterval
# to stagger the intervals
await sleepAsync((self.syncInterval div 2))
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic sync started"
var
hashes: seq[WakuMessageHash]
peer: RemotePeerInfo
tries = 3
while true:
let res = (await self.storeSynchronization()).valueOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "sync failed", error = $error
break
hashes = res[0]
peer = res[1]
break
if hashes.len > 0:
tries = 3
while true:
(await callback(hashes, peer.peerId)).isOkOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "transfer callback failed", error = $error
break
break
debug "periodic sync done", hashSynced = hashes.len
continue
proc periodicPrune(self: WakuSync, callback: PruneCallback) {.async.} =
debug "periodic prune initialized", interval = $self.syncInterval
# Default T minus 60m
self.pruneStart = getNowInNanosecondTime() - self.syncRange.nanos
await sleepAsync(self.syncInterval)
# Default T minus 55m
var pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
while true: # infinite loop
await sleepAsync(self.syncInterval)
debug "periodic prune started",
startTime = self.pruneStart - self.pruneOffset.nanos,
endTime = pruneStop,
storageSize = self.storage.len
var (elements, cursor) =
(newSeq[(WakuMessageHash, Timestamp)](0), none(WakuMessageHash))
var tries = 3
while true:
(elements, cursor) = (
await callback(self.pruneStart - self.pruneOffset.nanos, pruneStop, cursor)
).valueOr:
# we either try again or log an error and break
if tries > 0:
tries -= 1
await sleepAsync(RetryDelay)
continue
else:
error "pruning callback failed", error = $error
break
if elements.len == 0:
# no elements to remove, stop
break
for (hash, timestamp) in elements:
self.storage.erase(timestamp, hash).isOkOr:
error "storage erase failed",
timestamp = timestamp, msg_hash = hash.to0xHex(), error = $error
continue
if cursor.isNone():
# no more pages, stop
break
self.pruneStart = pruneStop
pruneStop = getNowInNanosecondTime() - self.syncRange.nanos
debug "periodic prune done", storageSize = self.storage.len
continue
proc start*(self: WakuSync) =
self.started = true
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicSyncFut = self.periodicSync(self.transferCallBack.get())
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
self.periodicPruneFut = self.periodicPrune(self.pruneCallBack.get())
info "WakuSync protocol started"
proc stopWait*(self: WakuSync) {.async.} =
if self.transferCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicSyncFut.cancelAndWait()
if self.pruneCallBack.isSome() and self.syncInterval > ZeroDuration:
await self.periodicPruneFut.cancelAndWait()
info "WakuSync protocol stopped"

View File

@ -1,513 +0,0 @@
{.push raises: [].}
from os import DirSep
import std/[strutils], chronicles, std/options, stew/byteutils, confutils, results
import ../waku_core/message
const negentropyPath =
currentSourcePath.rsplit(DirSep, 1)[0] & DirSep & ".." & DirSep & ".." & DirSep &
"vendor" & DirSep & "negentropy" & DirSep & "cpp" & DirSep
const NEGENTROPY_HEADER = negentropyPath & "negentropy_wrapper.h"
logScope:
topics = "waku sync"
type Buffer = object
len*: uint64
`ptr`*: ptr uint8
type BindingResult = object
output: Buffer
have_ids_len: uint
need_ids_len: uint
have_ids: ptr Buffer
need_ids: ptr Buffer
error: cstring
proc toWakuMessageHash(buffer: Buffer): WakuMessageHash =
assert buffer.len == 32
var hash: WakuMessageHash
copyMem(hash[0].addr, buffer.ptr, 32)
return hash
proc toBuffer(x: openArray[byte]): Buffer =
## converts the input to a Buffer object
## the Buffer object is used to communicate data with the rln lib
var temp = @x
let baseAddr = cast[pointer](x)
let output = Buffer(`ptr`: cast[ptr uint8](baseAddr), len: uint64(temp.len))
return output
proc bufferToBytes(buffer: ptr Buffer, len: Option[uint64] = none(uint64)): seq[byte] =
var bufLen: uint64
if isNone(len):
bufLen = buffer.len
else:
bufLen = len.get()
if bufLen == 0:
return @[]
trace "length of buffer is", len = bufLen
let bytes = newSeq[byte](bufLen)
copyMem(bytes[0].unsafeAddr, buffer.ptr, bufLen)
return bytes
proc toBufferSeq(buffLen: uint, buffPtr: ptr Buffer): seq[Buffer] =
var uncheckedArr = cast[ptr UncheckedArray[Buffer]](buffPtr)
var mySequence = newSeq[Buffer](buffLen)
for i in 0 .. buffLen - 1:
mySequence[i] = uncheckedArr[i]
return mySequence
### Storage ###
type NegentropyStorage* = distinct pointer
proc get_last_error(): cstring {.header: NEGENTROPY_HEADER, importc: "get_last_error".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L27
proc storage_init(
db_path: cstring, name: cstring
): NegentropyStorage {.header: NEGENTROPY_HEADER, importc: "storage_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L41
proc raw_insert(
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_insert".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L43
proc raw_erase(
storage: NegentropyStorage, timestamp: uint64, id: ptr Buffer
): bool {.header: NEGENTROPY_HEADER, importc: "storage_erase".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L29
proc free(
storage: NegentropyStorage
) {.header: NEGENTROPY_HEADER, importc: "storage_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size(
storage: NegentropyStorage
): cint {.header: NEGENTROPY_HEADER, importc: "storage_size".}
### Negentropy ###
type RawNegentropy* = distinct pointer
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L33
proc constructNegentropy(
storage: NegentropyStorage, frameSizeLimit: uint64
): RawNegentropy {.header: NEGENTROPY_HEADER, importc: "negentropy_new".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L37
proc raw_initiate(
negentropy: RawNegentropy, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_initiate".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L39
proc raw_setInitiator(
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_setinitiator".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L45
proc raw_reconcile(
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L51
proc raw_reconcile_with_ids(
negentropy: RawNegentropy, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_no_cbk".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L35
proc free(
negentropy: RawNegentropy
) {.header: NEGENTROPY_HEADER, importc: "negentropy_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L53
proc free_result(
r: ptr BindingResult
) {.header: NEGENTROPY_HEADER, importc: "free_result".}
### SubRange ###
type NegentropySubRangeStorage* = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L57
proc subrange_init(
storage: NegentropyStorage, startTimestamp: uint64, endTimestamp: uint64
): NegentropySubRangeStorage {.header: NEGENTROPY_HEADER, importc: "subrange_new".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L59
proc free(
subrange: NegentropySubRangeStorage
) {.header: NEGENTROPY_HEADER, importc: "subrange_delete".}
# https://github.com/waku-org/negentropy/blob/d4845b95b5a2d9bee28555833e7502db71bf319f/cpp/negentropy_wrapper.h#L31
proc size(
subrange: NegentropySubRangeStorage
): cint {.header: NEGENTROPY_HEADER, importc: "subrange_size".}
### Negentropy with NegentropySubRangeStorage ###
type RawNegentropySubRange = distinct pointer
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L61
proc constructNegentropyWithSubRange(
subrange: NegentropySubRangeStorage, frameSizeLimit: uint64
): RawNegentropySubRange {.
header: NEGENTROPY_HEADER, importc: "negentropy_subrange_new"
.}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L65
proc raw_initiate_subrange(
negentropy: RawNegentropySubRange, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_initiate".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L67
proc raw_reconcile_subrange(
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_subrange".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L69
proc raw_reconcile_with_ids_subrange(
negentropy: RawNegentropySubRange, query: ptr Buffer, r: ptr BindingResult
): int {.header: NEGENTROPY_HEADER, importc: "reconcile_with_ids_subrange_no_cbk".}
# https://github.com/waku-org/negentropy/blob/3044a30e4ba2e218aee6dee2ef5b4a4b6f144865/cpp/negentropy_wrapper.h#L63
proc free(
negentropy: RawNegentropySubRange
) {.header: NEGENTROPY_HEADER, importc: "negentropy_subrange_delete".}
### Wrappings ###
### Storage ###
proc `==`*(a: NegentropyStorage, b: pointer): bool {.borrow.}
proc new*(T: type NegentropyStorage): Result[T, string] =
#TODO db name and path
let storage = storage_init("", "")
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
if storage == nil:
return err($get_last_error())
return ok(storage)
proc delete*(storage: NegentropyStorage) =
storage.free()
proc erase*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_erase(storage, uint64(id), bufPtr)
#TODO error handling once we move to lmdb
if res:
return ok()
else:
return err($get_last_error())
proc insert*(
storage: NegentropyStorage, id: int64, hash: WakuMessageHash
): Result[void, string] =
var buffer = toBuffer(hash)
var bufPtr = addr(buffer)
let res = raw_insert(storage, uint64(id), bufPtr)
#TODO error handling once we move to lmdb
if res:
return ok()
else:
return err($get_last_error())
proc len*(storage: NegentropyStorage): int =
int(storage.size)
### SubRange ###
proc `==`*(a: NegentropySubRangeStorage, b: pointer): bool {.borrow.}
proc new*(
T: type NegentropySubRangeStorage,
storage: NegentropyStorage,
startTime: uint64 = uint64.low,
endTime: uint64 = uint64.high,
): Result[T, string] =
let subrange = subrange_init(storage, startTime, endTime)
#[ TODO: Uncomment once we move to lmdb
if storage == nil:
return err("storage initialization failed") ]#
if subrange == nil:
return err($get_last_error())
return ok(subrange)
proc delete*(subrange: NegentropySubRangeStorage) =
subrange.free()
proc len*(subrange: NegentropySubRangeStorage): int =
int(subrange.size)
### Interface ###
type
Negentropy* = ref object of RootObj
NegentropyWithSubRange = ref object of Negentropy
inner: RawNegentropySubRange
NegentropyWithStorage = ref object of Negentropy
inner: RawNegentropy
NegentropyPayload* = distinct seq[byte]
method delete*(self: Negentropy) {.base, gcsafe.} =
discard
method initiate*(self: Negentropy): Result[NegentropyPayload, string] {.base.} =
discard
method serverReconcile*(
self: Negentropy, query: NegentropyPayload
): Result[NegentropyPayload, string] {.base.} =
discard
method clientReconcile*(
self: Negentropy,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] {.base.} =
discard
### Impl. ###
proc new*(
T: type Negentropy,
storage: NegentropyStorage | NegentropySubRangeStorage,
frameSizeLimit: int,
): Result[T, string] =
if storage is NegentropyStorage:
let raw_negentropy =
constructNegentropy(NegentropyStorage(storage), uint64(frameSizeLimit))
if cast[pointer](raw_negentropy) == nil:
return err($get_last_error())
let negentropy = NegentropyWithStorage(inner: raw_negentropy)
return ok(negentropy)
elif storage is NegentropySubRangeStorage:
let raw_negentropy = constructNegentropyWithSubRange(
NegentropySubRangeStorage(storage), uint64(frameSizeLimit)
)
if cast[pointer](raw_negentropy) == nil:
return err($get_last_error())
let negentropy = NegentropyWithSubRange(inner: raw_negentropy)
return ok(negentropy)
method delete*(self: NegentropyWithSubRange) =
self.inner.free()
method initiate*(self: NegentropyWithSubRange): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_initiate_subrange(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err($get_last_error())
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
free_result(myResultPtr)
trace "received return from initiate", len = myResultPtr.output.len
return ok(NegentropyPayload(bytes))
method serverReconcile*(
self: NegentropyWithSubRange, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
let queryBuf = toBuffer(seq[byte](query))
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_subrange(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
free_result(myResultPtr)
return ok(NegentropyPayload(outputBytes))
method clientReconcile*(
self: NegentropyWithSubRange,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
let cQuery = toBuffer(seq[byte](query))
var myResult {.noinit.}: BindingResult = BindingResult()
myResult.have_ids_len = 0
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_with_ids_subrange(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
let output = bufferToBytes(addr myResult.output)
var
have_hashes: seq[Buffer]
need_hashes: seq[Buffer]
if myResult.have_ids_len > 0:
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
if myResult.need_ids_len > 0:
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
trace "have and need hashes ",
have_count = have_hashes.len, need_count = need_hashes.len
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
haves.add(hash)
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
needs.add(hash)
trace "return ", output = output, len = output.len
free_result(myResultPtr)
if output.len < 1:
return ok(none(NegentropyPayload))
return ok(some(NegentropyPayload(output)))
method delete*(self: NegentropyWithStorage) =
self.inner.free()
method initiate*(self: NegentropyWithStorage): Result[NegentropyPayload, string] =
## Client inititate a sync session with a server by sending a payload
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_initiate(myResultPtr)
if ret < 0 or myResultPtr == nil:
error "negentropy initiate failed with code ", code = ret
return err($get_last_error())
let bytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
free_result(myResultPtr)
trace "received return from initiate", len = myResultPtr.output.len
return ok(NegentropyPayload(bytes))
method serverReconcile*(
self: NegentropyWithStorage, query: NegentropyPayload
): Result[NegentropyPayload, string] =
## Server response to a negentropy payload.
## Always return an answer.
let queryBuf = toBuffer(seq[byte](query))
var queryBufPtr = queryBuf.unsafeAddr #TODO: Figure out why addr(buffer) throws error
var myResult {.noinit.}: BindingResult = BindingResult()
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile(queryBufPtr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
trace "received return from raw_reconcile", len = myResultPtr.output.len
let outputBytes: seq[byte] = bufferToBytes(addr(myResultPtr.output))
trace "outputBytes len", len = outputBytes.len
free_result(myResultPtr)
return ok(NegentropyPayload(outputBytes))
method clientReconcile*(
self: NegentropyWithStorage,
query: NegentropyPayload,
haves: var seq[WakuMessageHash],
needs: var seq[WakuMessageHash],
): Result[Option[NegentropyPayload], string] =
## Client response to a negentropy payload.
## May return an answer, if not the sync session done.
let cQuery = toBuffer(seq[byte](query))
var myResult {.noinit.}: BindingResult = BindingResult()
myResult.have_ids_len = 0
myResult.need_ids_len = 0
var myResultPtr = addr myResult
let ret = self.inner.raw_reconcile_with_ids(cQuery.unsafeAddr, myResultPtr)
if ret < 0:
error "raw_reconcile failed with code ", code = ret
return err($get_last_error())
let output = bufferToBytes(addr myResult.output)
var
have_hashes: seq[Buffer]
need_hashes: seq[Buffer]
if myResult.have_ids_len > 0:
have_hashes = toBufferSeq(myResult.have_ids_len, myResult.have_ids)
if myResult.need_ids_len > 0:
need_hashes = toBufferSeq(myResult.need_ids_len, myResult.need_ids)
trace "have and need hashes ",
have_count = have_hashes.len, need_count = need_hashes.len
for i in 0 .. have_hashes.len - 1:
var hash = toWakuMessageHash(have_hashes[i])
trace "have hashes ", index = i, msg_hash = hash.to0xHex()
haves.add(hash)
for i in 0 .. need_hashes.len - 1:
var hash = toWakuMessageHash(need_hashes[i])
trace "need hashes ", index = i, msg_hash = hash.to0xHex()
needs.add(hash)
trace "return ", output = output, len = output.len
free_result(myResultPtr)
if output.len < 1:
return ok(none(NegentropyPayload))
return ok(some(NegentropyPayload(output)))

View File

@ -1,240 +0,0 @@
{.push raises: [].}
import std/options, results, chronos, libp2p/stream/connection
import
../common/nimchronos,
../common/protobuf,
../waku_core,
./raw_bindings,
./common,
./codec
#TODO add states for protocol negotiation
### Type State ###
type ClientSync* = object
haveHashes: seq[WakuMessageHash]
type ServerSync* = object
# T is either ClientSync or ServerSync
type Reconciled*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
frameSize: int
payload*: SyncPayload
type Sent*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
frameSize: int
type Received*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
frameSize: int
payload*: SyncPayload
type Completed*[T] = object
sync: T
negentropy: Negentropy
connection: Connection
haveHashes: seq[WakuMessageHash]
### State Transition ###
proc clientInitialize*(
store: NegentropyStorage,
conn: Connection,
frameSize = DefaultMaxFrameSize,
start = int64.low,
`end` = int64.high,
): Result[Reconciled[ClientSync], string] =
let subrange = ?NegentropySubRangeStorage.new(store, uint64(start), uint64(`end`))
let negentropy = ?Negentropy.new(subrange, frameSize)
let negentropyPayload = ?negentropy.initiate()
let payload = SyncPayload(negentropy: seq[byte](negentropyPayload))
let sync = ClientSync()
return ok(
Reconciled[ClientSync](
sync: sync,
negentropy: negentropy,
connection: conn,
frameSize: frameSize,
payload: payload,
)
)
proc serverInitialize*(
store: NegentropyStorage,
conn: Connection,
frameSize = DefaultMaxFrameSize,
syncStart = int64.low,
syncEnd = int64.high,
): Result[Sent[ServerSync], string] =
let subrange =
?NegentropySubRangeStorage.new(store, uint64(syncStart), uint64(syncEnd))
let negentropy = ?Negentropy.new(subrange, frameSize)
let sync = ServerSync()
return ok(
Sent[ServerSync](
sync: sync, negentropy: negentropy, connection: conn, frameSize: frameSize
)
)
proc send*[T](self: Reconciled[T]): Future[Result[Sent[T], string]] {.async.} =
let writeRes = catch:
await self.connection.writeLP(self.payload.encode().buffer)
if writeRes.isErr():
return err("send connection write error: " & writeRes.error.msg)
return ok(
Sent[T](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
)
)
proc listenBack*[T](self: Sent[T]): Future[Result[Received[T], string]] {.async.} =
let readRes = catch:
await self.connection.readLp(-1)
let buffer: seq[byte] =
if readRes.isOk():
readRes.get()
else:
return err("listenBack connection read error: " & readRes.error.msg)
# can't otherwise the compiler complains
#let payload = SyncPayload.decode(buffer).valueOr:
#return err($error)
let decodeRes = SyncPayload.decode(buffer)
let payload =
if decodeRes.isOk():
decodeRes.get()
else:
let decodeError: ProtobufError = decodeRes.error
let errMsg = $decodeError
return err("listenBack decoding error: " & errMsg)
return ok(
Received[T](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
# Aliasing for readability
type ContinueOrCompleted[T] = Result[Reconciled[T], Completed[T]]
type Continue[T] = Reconciled[T]
proc clientReconcile*(
self: Received[ClientSync], needHashes: var seq[WakuMessageHash]
): Result[ContinueOrCompleted[ClientSync], string] =
var haves = self.sync.haveHashes
let responseOpt =
?self.negentropy.clientReconcile(
NegentropyPayload(self.payload.negentropy), haves, needHashes
)
let sync = ClientSync(haveHashes: haves)
let response = responseOpt.valueOr:
let res = ContinueOrCompleted[ClientSync].err(
Completed[ClientSync](
sync: sync, negentropy: self.negentropy, connection: self.connection
)
)
return ok(res)
let payload = SyncPayload(negentropy: seq[byte](response))
let res = ContinueOrCompleted[ClientSync].ok(
Continue[ClientSync](
sync: sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
return ok(res)
proc serverReconcile*(
self: Received[ServerSync]
): Result[ContinueOrCompleted[ServerSync], string] =
if self.payload.negentropy.len == 0:
let res = ContinueOrCompleted[ServerSync].err(
Completed[ServerSync](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
haveHashes: self.payload.hashes,
)
)
return ok(res)
let response =
?self.negentropy.serverReconcile(NegentropyPayload(self.payload.negentropy))
let payload = SyncPayload(negentropy: seq[byte](response))
let res = ContinueOrCompleted[ServerSync].ok(
Continue[ServerSync](
sync: self.sync,
negentropy: self.negentropy,
connection: self.connection,
frameSize: self.frameSize,
payload: payload,
)
)
return ok(res)
proc clientTerminate*(
self: Completed[ClientSync]
): Future[Result[void, string]] {.async.} =
let payload = SyncPayload(hashes: self.sync.haveHashes)
let writeRes = catch:
await self.connection.writeLp(payload.encode().buffer)
if writeRes.isErr():
return err("clientTerminate connection write error: " & writeRes.error.msg)
self.negentropy.delete()
return ok()
proc serverTerminate*(
self: Completed[ServerSync]
): Future[seq[WakuMessageHash]] {.async.} =
self.negentropy.delete()
return self.haveHashes

View File

@ -1,76 +0,0 @@
# Unused yet. Kept for future use.
#[ when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/[times, tables, options], chronicles, chronos, stew/results
import ./raw_bindings, ../waku_core/time
logScope:
topics = "waku sync"
type WakuSyncStorageManager* = ref object
storages: OrderedTable[string, Storage]
# Map of dateTime and Storage objects. DateTime is of the format YYYYMMDDHH
maxHours: int64
proc new*(
T: type WakuSyncStorageManager,
hoursToStore: times.Duration = initDuration(minutes = 120),
): T =
return WakuSyncStorageManager(maxHours: hoursToStore.inHours)
proc getRecentStorage*(self: WakuSyncStorageManager): Result[Option[Storage], string] =
if self.storages.len() == 0:
return ok(none(Storage))
var storageToFetch: Storage
#is there a more effective way to fetch last element?
for k, storage in self.storages:
storageToFetch = storage
return ok(some(storageToFetch))
proc deleteOldestStorage*(self: WakuSyncStorageManager) =
var storageToDelete: Storage
var time: string
#is there a more effective way to fetch first element?
for k, storage in self.storages:
storageToDelete = storage
time = k
break
if self.storages.pop(time, storageToDelete):
delete(storageToDelete)
proc retrieveStorage*(
self: WakuSyncStorageManager, time: Timestamp
): Result[Option[Storage], string] =
var timestamp: Timestamp
if time == 0:
timestamp = timestampInSeconds(getNowInNanosecondTime())
debug "timestamp not provided, using now to fetch storage", timestamp = timestamp
else:
timestamp = timestampInSeconds(time)
let tsTime = times.fromUnix(timestamp)
let dateTime = times.format(tsTime, "yyyyMMddHH", utc())
var storage: Storage = self.storages.getOrDefault(dateTime)
if storage == nil:
#create a new storage
# TODO: May need synchronization??
# Limit number of storages to configured duration
let hours = self.storages.len()
if hours == self.maxHours:
#Need to delete oldest storage at this point, but what if that is being synced?
self.deleteOldestStorage()
info "number of storages reached, deleting the oldest"
info "creating a new storage for ", time = dateTime
storage = Storage.new().valueOr:
error "storage creation failed"
return err(error)
self.storages[dateTime] = storage
return ok(some(storage))
]#