mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 06:23:10 +00:00
deploy: 7eab2009e2124a0e1345346e572454c9c41dddb4
This commit is contained in:
parent
b1f40c999b
commit
0873fbd6b4
@ -50,7 +50,6 @@ const Help = """
|
||||
|
||||
const
|
||||
PayloadV1* {.booldefine.} = false
|
||||
DefaultTopic* = "/waku/2/default-waku/proto"
|
||||
|
||||
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
||||
# Could poll connection pool or something here, I suppose
|
||||
@ -175,7 +174,7 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) =
|
||||
echo &"{chatLine}"
|
||||
c.prompt = false
|
||||
showChatPrompt(c)
|
||||
trace "Printing message", topic=DefaultTopic, chatLine,
|
||||
trace "Printing message", topic=DefaultPubsubTopic, chatLine,
|
||||
contentTopic = msg.contentTopic
|
||||
else:
|
||||
debug "Invalid encoded WakuMessage payload",
|
||||
@ -194,7 +193,7 @@ proc printReceivedMessage(c: Chat, msg: WakuMessage) =
|
||||
|
||||
c.prompt = false
|
||||
showChatPrompt(c)
|
||||
trace "Printing message", topic=DefaultTopic, chatLine,
|
||||
trace "Printing message", topic=DefaultPubsubTopic, chatLine,
|
||||
contentTopic = msg.contentTopic
|
||||
|
||||
proc readNick(transp: StreamTransport): Future[string] {.async.} =
|
||||
@ -243,9 +242,9 @@ proc publish(c: Chat, line: string) =
|
||||
c.node.wakuRlnRelay.lastEpoch = message.proof.epoch
|
||||
if not c.node.wakuLightPush.isNil():
|
||||
# Attempt lightpush
|
||||
asyncSpawn c.node.lightpushPublish(DefaultTopic, message)
|
||||
asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message)
|
||||
else:
|
||||
asyncSpawn c.node.publish(DefaultTopic, message, handler)
|
||||
asyncSpawn c.node.publish(DefaultPubsubTopic, message, handler)
|
||||
else:
|
||||
warn "Payload encoding failed", error = encodedPayload.error
|
||||
else:
|
||||
@ -272,9 +271,9 @@ proc publish(c: Chat, line: string) =
|
||||
|
||||
if not c.node.wakuLightPush.isNil():
|
||||
# Attempt lightpush
|
||||
asyncSpawn c.node.lightpushPublish(DefaultTopic, message)
|
||||
asyncSpawn c.node.lightpushPublish(DefaultPubsubTopic, message)
|
||||
else:
|
||||
asyncSpawn c.node.publish(DefaultTopic, message)
|
||||
asyncSpawn c.node.publish(DefaultPubsubTopic, message)
|
||||
|
||||
# TODO This should read or be subscribe handler subscribe
|
||||
proc readAndPrint(c: Chat) {.async.} =
|
||||
@ -327,7 +326,7 @@ proc writeAndPrint(c: Chat) {.async.} =
|
||||
if not c.node.wakuFilter.isNil():
|
||||
echo "unsubscribing from content filters..."
|
||||
|
||||
await c.node.unsubscribe(pubsubTopic=DefaultTopic, contentTopics=c.contentTopic)
|
||||
await c.node.unsubscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=c.contentTopic)
|
||||
|
||||
echo "quitting..."
|
||||
|
||||
@ -502,12 +501,12 @@ proc processInput(rfd: AsyncFD) {.async.} =
|
||||
|
||||
node.setFilterPeer(parseRemotePeerInfo(conf.filternode))
|
||||
|
||||
proc filterHandler(pubsubTopic: string, msg: WakuMessage) {.gcsafe.} =
|
||||
proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} =
|
||||
trace "Hit filter handler", contentTopic=msg.contentTopic
|
||||
|
||||
chat.printReceivedMessage(msg)
|
||||
|
||||
await node.subscribe(pubsubTopic=DefaultTopic, contentTopics=chat.contentTopic, filterHandler)
|
||||
await node.subscribe(pubsubTopic=DefaultPubsubTopic, contentTopics=chat.contentTopic, filterHandler)
|
||||
|
||||
# Subscribe to a topic, if relay is mounted
|
||||
if conf.relay:
|
||||
@ -522,7 +521,7 @@ proc processInput(rfd: AsyncFD) {.async.} =
|
||||
else:
|
||||
trace "Invalid encoded WakuMessage", error = decoded.error
|
||||
|
||||
let topic = cast[Topic](DefaultTopic)
|
||||
let topic = DefaultPubsubTopic
|
||||
node.subscribe(topic, handler)
|
||||
|
||||
when defined(rln) or defined(rlnzerokit):
|
||||
|
||||
@ -32,7 +32,6 @@ logScope:
|
||||
##################
|
||||
|
||||
const
|
||||
DefaultTopic* = chat2.DefaultTopic
|
||||
DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue
|
||||
|
||||
#########
|
||||
@ -92,7 +91,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
|
||||
|
||||
chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])
|
||||
|
||||
await cmb.nodev2.publish(DefaultTopic, msg)
|
||||
await cmb.nodev2.publish(DefaultPubsubTopic, msg)
|
||||
|
||||
proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
|
||||
if cmb.seen.containsOrAdd(msg.payload.hash()):
|
||||
@ -195,13 +194,13 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
|
||||
|
||||
# Bridging
|
||||
# Handle messages on Waku v2 and bridge to Matterbridge
|
||||
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe, raises: [Defect].} =
|
||||
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe, raises: [Defect].} =
|
||||
let msg = WakuMessage.decode(data)
|
||||
if msg.isOk():
|
||||
trace "Bridging message from Chat2 to Matterbridge", msg=msg[]
|
||||
cmb.toMatterbridge(msg[])
|
||||
|
||||
cmb.nodev2.subscribe(DefaultTopic, relayHandler)
|
||||
cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)
|
||||
|
||||
proc stop*(cmb: Chat2MatterBridge) {.async.} =
|
||||
info "Stopping Chat2MatterBridge"
|
||||
|
||||
@ -302,7 +302,7 @@ proc start*(bridge: WakuBridge) {.async.} =
|
||||
bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived)
|
||||
|
||||
# Handle messages on Waku v2 and bridge to Waku v1
|
||||
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
|
||||
let msg = WakuMessage.decode(data)
|
||||
if msg.isOk() and msg.get().isBridgeable():
|
||||
try:
|
||||
|
||||
@ -11,7 +11,8 @@ import
|
||||
import
|
||||
../../waku/v2/node/waku_node,
|
||||
../../waku/v2/utils/peers,
|
||||
../test_helpers
|
||||
../test_helpers,
|
||||
./testlib/common
|
||||
|
||||
procSuite "Peer Exchange":
|
||||
asyncTest "GossipSub (relay) peer exchange":
|
||||
@ -43,7 +44,7 @@ procSuite "Peer Exchange":
|
||||
check:
|
||||
# Node 3 is informed of node 2 via peer exchange
|
||||
peer == node1.switch.peerInfo.peerId
|
||||
topic == defaultTopic
|
||||
topic == DefaultPubsubTopic
|
||||
peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1
|
||||
|
||||
if (not completionFut.completed()):
|
||||
|
||||
@ -45,7 +45,7 @@ suite "REST API - Relay":
|
||||
restServer.start()
|
||||
|
||||
let pubSubTopics = @[
|
||||
PubSubTopicString("pubsub-topic-1"),
|
||||
PubSubTopicString("pubsub-topic-1"),
|
||||
PubSubTopicString("pubsub-topic-2"),
|
||||
PubSubTopicString("pubsub-topic-3")
|
||||
]
|
||||
@ -204,7 +204,7 @@ suite "REST API - Relay":
|
||||
]
|
||||
discard await client.relayPostSubscriptionsV1(newTopics)
|
||||
|
||||
let response = await client.relayPostMessagesV1(defaultTopic, RelayWakuMessage(
|
||||
let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage(
|
||||
payload: Base64String.encode("TEST-PAYLOAD"),
|
||||
contentTopic: some(ContentTopicString(defaultContentTopic)),
|
||||
timestamp: some(int64(2022))
|
||||
|
||||
@ -56,7 +56,7 @@ suite "Waku Filter":
|
||||
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
let pushHandlerFuture = newFuture[(string, WakuMessage)]()
|
||||
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
|
||||
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
|
||||
pushHandlerFuture.complete((pubsubTopic, message))
|
||||
|
||||
let
|
||||
@ -99,7 +99,7 @@ suite "Waku Filter":
|
||||
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
var pushHandlerFuture = newFuture[void]()
|
||||
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
|
||||
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
|
||||
pushHandlerFuture.complete()
|
||||
|
||||
let
|
||||
@ -151,7 +151,7 @@ suite "Waku Filter":
|
||||
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
var pushHandlerFuture = newFuture[void]()
|
||||
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
|
||||
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
|
||||
pushHandlerFuture.complete()
|
||||
|
||||
let
|
||||
@ -216,7 +216,7 @@ suite "Waku Filter":
|
||||
let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
var pushHandlerFuture = newFuture[void]()
|
||||
proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} =
|
||||
proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} =
|
||||
pushHandlerFuture.complete()
|
||||
|
||||
let
|
||||
|
||||
@ -43,7 +43,7 @@ suite "Waku Lightpush":
|
||||
|
||||
## Given
|
||||
let handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
@ -87,7 +87,7 @@ suite "Waku Lightpush":
|
||||
let error = "test_failure"
|
||||
|
||||
let handlerFuture = newFuture[void]()
|
||||
let handler = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete()
|
||||
return err(error)
|
||||
|
||||
|
||||
@ -43,7 +43,7 @@ procSuite "WakuNode - Lightpush":
|
||||
let message = fakeWakuMessage()
|
||||
|
||||
var completionFutRelay = newFuture[bool]()
|
||||
proc relayHandler(pubsubTopic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
|
||||
let msg = WakuMessage.decode(data).get()
|
||||
check:
|
||||
pubsubTopic == DefaultPubsubTopic
|
||||
|
||||
@ -180,15 +180,15 @@ procSuite "WakuNode - Store":
|
||||
msg2 = fakeWakuMessage(payload="hello world2", ts=(timeOrigin + getNanoSecondTime(2)))
|
||||
msg3 = fakeWakuMessage(payload="hello world3", ts=(timeOrigin + getNanoSecondTime(3)))
|
||||
|
||||
require server.wakuStore.store.put(DefaultTopic, msg1).isOk()
|
||||
require server.wakuStore.store.put(DefaultTopic, msg2).isOk()
|
||||
require server.wakuStore.store.put(DefaultPubsubTopic, msg1).isOk()
|
||||
require server.wakuStore.store.put(DefaultPubsubTopic, msg2).isOk()
|
||||
|
||||
# Insert the same message in both node's store
|
||||
let
|
||||
receivedTime3 = now() + getNanosecondTime(10)
|
||||
digest3 = computeDigest(msg3)
|
||||
require server.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
|
||||
require client.wakuStore.store.put(DefaultTopic, msg3, digest3, receivedTime3).isOk()
|
||||
require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
|
||||
require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
|
||||
|
||||
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
||||
|
||||
|
||||
@ -5,9 +5,9 @@ import
|
||||
../../../waku/v2/protocol/waku_message,
|
||||
../../../waku/v2/utils/time
|
||||
|
||||
const
|
||||
DefaultPubsubTopic* = "/waku/2/default-waku/proto"
|
||||
DefaultContentTopic* = ContentTopic("/waku/2/default-content/proto")
|
||||
export
|
||||
waku_message.DefaultPubsubTopic,
|
||||
waku_message.DefaultContentTopic
|
||||
|
||||
|
||||
proc now*(): Timestamp =
|
||||
|
||||
@ -21,10 +21,6 @@ template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
||||
const sigWakuPath = sourceDir / ".." / ".." / "waku" / "v2" / "node" / "jsonrpc" / "jsonrpc_callsigs.nim"
|
||||
createRpcSigs(RpcHttpClient, sigWakuPath)
|
||||
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
const defaultContentTopic = ContentTopic("waku/2/default-content/proto")
|
||||
|
||||
const topicAmount = 10 #100
|
||||
|
||||
proc message(i: int): ProtoBuffer =
|
||||
@ -45,7 +41,7 @@ for i in 0..<amount:
|
||||
var node = newRpcHttpClient()
|
||||
nodes.add(node)
|
||||
waitFor nodes[i].connect("localhost", Port(8547+i), false)
|
||||
var res = waitFor nodes[i].post_waku_v2_relay_v1_subscriptions(@[defaultTopic])
|
||||
var res = waitFor nodes[i].post_waku_v2_relay_v1_subscriptions(@[DefaultPubsubTopic])
|
||||
|
||||
os.sleep(2000)
|
||||
|
||||
@ -54,18 +50,18 @@ os.sleep(2000)
|
||||
# os.sleep(50)
|
||||
# # TODO: This would then publish on a subtopic here
|
||||
# var s = "hello " & $2
|
||||
# var res3 = waitFor nodes[0].wakuPublish(defaultTopic, s)
|
||||
# var res3 = waitFor nodes[0].wakuPublish(DefaultPbsubTopic, s)
|
||||
|
||||
# Scenario xx3 - same as xx1 but publish from multiple nodes
|
||||
# To compare FloodSub and GossipSub factor
|
||||
for i in 0..<topicAmount:
|
||||
os.sleep(50)
|
||||
# TODO: This would then publish on a subtopic here
|
||||
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
var res3 = waitFor nodes[0].post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: message(0).buffer, contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[1].post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: message(1).buffer, contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[2].post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: message(2).buffer, contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[3].post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: message(3).buffer, contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
res3 = waitFor nodes[4].post_waku_v2_relay_v1_message(DefaultPubsubTopic, WakuRelayMessage(payload: message(4).buffer, contentTopic: some(DefaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))
|
||||
|
||||
# Scenario xx2 - 14 full nodes, two edge nodes
|
||||
# Assume one full topic
|
||||
@ -79,8 +75,8 @@ for i in 0..<topicAmount:
|
||||
#let version = waitFor nodea.wakuVersion()
|
||||
#info "Version is", version
|
||||
#
|
||||
#let res1 = waitFor nodea.wakuSubscribe(defaultTopic)
|
||||
#let res2 = waitFor nodeb.wakuSubscribe(defaultTopic)
|
||||
#let res1 = waitFor nodea.wakuSubscribe(DefaultPbsubTopic)
|
||||
#let res2 = waitFor nodeb.wakuSubscribe(DefaultPbsubTopic)
|
||||
#
|
||||
#let amount = 14
|
||||
#var nodes: seq[RPCHttpClient]
|
||||
@ -88,7 +84,7 @@ for i in 0..<topicAmount:
|
||||
# var node = newRpcHttpClient()
|
||||
# nodes.add(node)
|
||||
# waitFor nodes[i].connect("localhost", Port(8547+i))
|
||||
# var res = waitFor nodes[i].wakuSubscribe(defaultTopic)
|
||||
# var res = waitFor nodes[i].wakuSubscribe(DefaultPbsubTopic)
|
||||
#
|
||||
#os.sleep(2000)
|
||||
#
|
||||
@ -97,7 +93,7 @@ for i in 0..<topicAmount:
|
||||
# os.sleep(50)
|
||||
# # TODO: This would then publish on a subtopic here
|
||||
# var s = "hello " & $2
|
||||
# var res3 = waitFor nodea.wakuPublish(defaultTopic, s)
|
||||
# var res3 = waitFor nodea.wakuPublish(DefaultPbsubTopic, s)
|
||||
|
||||
# Misc old scenarios
|
||||
#########################################
|
||||
|
||||
@ -48,10 +48,10 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache:
|
||||
debug "post_waku_v2_filter_v1_subscription"
|
||||
|
||||
let
|
||||
pubsubTopic: string = topic.get(DefaultPubsubTopic)
|
||||
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
|
||||
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
|
||||
|
||||
let pushHandler:FilterPushHandler = proc(pubsubTopic: string, msg: WakuMessage) {.gcsafe, closure.} =
|
||||
let pushHandler:FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} =
|
||||
# Add message to current cache
|
||||
trace "WakuMessage received", msg=msg
|
||||
|
||||
@ -87,7 +87,7 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache:
|
||||
debug "delete_waku_v2_filter_v1_subscription"
|
||||
|
||||
let
|
||||
pubsubTopic: string = topic.get(DefaultPubsubTopic)
|
||||
pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic)
|
||||
contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic)
|
||||
|
||||
let unsubFut = node.unsubscribe(pubsubTopic, contentTopics)
|
||||
|
||||
@ -19,7 +19,7 @@ type Index* = object
|
||||
receiverTime*: Timestamp
|
||||
digest*: MessageDigest # calculated over payload and content topic
|
||||
|
||||
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T =
|
||||
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
|
||||
## Takes a WakuMessage with received timestamp and returns its Index.
|
||||
let
|
||||
digest = computeDigest(msg)
|
||||
|
||||
@ -254,7 +254,7 @@ proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[voi
|
||||
return ok()
|
||||
|
||||
|
||||
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
|
||||
method put*(store: StoreQueueRef, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
|
||||
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
|
||||
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
|
||||
store.add(message)
|
||||
|
||||
@ -17,7 +17,7 @@ const DbTable = "Message"
|
||||
|
||||
type SqlQueryStr = string
|
||||
|
||||
type DbCursor* = (Timestamp, seq[byte], string)
|
||||
type DbCursor* = (Timestamp, seq[byte], PubsubTopic)
|
||||
|
||||
|
||||
### SQLite column helper methods
|
||||
@ -45,7 +45,7 @@ proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint):
|
||||
let storedAt = sqlite3_column_int64(s, storedAtCol)
|
||||
Timestamp(storedAt)
|
||||
|
||||
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string =
|
||||
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): PubsubTopic =
|
||||
let
|
||||
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
|
||||
pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol)
|
||||
@ -113,7 +113,7 @@ proc insertMessageQuery(table: string): SqlQueryStr =
|
||||
|
||||
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
|
||||
let query = insertMessageQuery(DbTable)
|
||||
db.prepareStmt( query, InsertMessageParams, void).expect("this is a valid statement")
|
||||
db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement")
|
||||
|
||||
|
||||
## Count table messages
|
||||
@ -202,9 +202,9 @@ proc selectAllMessagesQuery(table: string): SqlQueryStr =
|
||||
" FROM " & table &
|
||||
" ORDER BY storedAt ASC"
|
||||
|
||||
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(string, WakuMessage, seq[byte], Timestamp)]] =
|
||||
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] =
|
||||
## Retrieve all messages from the store.
|
||||
var rows: seq[(string, WakuMessage, seq[byte], Timestamp)]
|
||||
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
|
||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||
let
|
||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
||||
@ -247,7 +247,7 @@ proc cursorWhereClause(cursor: Option[DbCursor], ascending=true): Option[string]
|
||||
let whereClause = "(storedAt, id, pubsubTopic) " & comp & " (?, ?, ?)"
|
||||
some(whereClause)
|
||||
|
||||
proc pubsubWhereClause(pubsubTopic: Option[string]): Option[string] =
|
||||
proc pubsubWhereClause(pubsubTopic: Option[PubsubTopic]): Option[string] =
|
||||
if pubsubTopic.isNone():
|
||||
return none(string)
|
||||
|
||||
@ -303,7 +303,7 @@ proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): Datab
|
||||
|
||||
proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
|
||||
contentTopic: Option[seq[ContentTopic]],
|
||||
pubsubTopic: Option[string],
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
cursor: Option[DbCursor],
|
||||
startTime: Option[Timestamp],
|
||||
endTime: Option[Timestamp],
|
||||
@ -359,15 +359,15 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
|
||||
|
||||
proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
|
||||
contentTopic: Option[seq[ContentTopic]],
|
||||
pubsubTopic: Option[string],
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
cursor: Option[DbCursor],
|
||||
startTime: Option[Timestamp],
|
||||
endTime: Option[Timestamp],
|
||||
limit: uint64,
|
||||
ascending: bool): DatabaseResult[seq[(string, WakuMessage, seq[byte], Timestamp)]] =
|
||||
ascending: bool): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]] =
|
||||
|
||||
|
||||
var messages: seq[(string, WakuMessage, seq[byte], Timestamp)] = @[]
|
||||
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[]
|
||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||
let
|
||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
||||
|
||||
@ -68,7 +68,7 @@ proc close*(s: SqliteStore) =
|
||||
s.db.close()
|
||||
|
||||
|
||||
method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
|
||||
method put*(s: SqliteStore, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
|
||||
## Inserts a message into the store
|
||||
|
||||
let res = s.insertStmt.exec((
|
||||
@ -85,7 +85,7 @@ method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: M
|
||||
|
||||
ok()
|
||||
|
||||
method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
||||
method put*(s: SqliteStore, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] =
|
||||
## Inserts a message into the store
|
||||
procCall MessageStore(s).put(pubsubTopic, message)
|
||||
|
||||
@ -98,7 +98,7 @@ method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]
|
||||
method getMessagesByHistoryQuery*(
|
||||
s: SqliteStore,
|
||||
contentTopic = none(seq[ContentTopic]),
|
||||
pubsubTopic = none(string),
|
||||
pubsubTopic = none(PubsubTopic),
|
||||
cursor = none(PagingIndex),
|
||||
startTime = none(Timestamp),
|
||||
endTime = none(Timestamp),
|
||||
|
||||
@ -61,8 +61,6 @@ const git_version* {.strdefine.} = "n/a"
|
||||
# Default clientId
|
||||
const clientId* = "Nimbus Waku v2 node"
|
||||
|
||||
const defaultTopic*: PubsubTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
# Default Waku Filter Timeout
|
||||
const WakuFilterTimeout: Duration = 1.days
|
||||
|
||||
@ -395,7 +393,7 @@ proc mountRelay*(node: WakuNode,
|
||||
|
||||
## The default relay topics is the union of
|
||||
## all configured topics plus the hard-coded defaultTopic(s)
|
||||
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
|
||||
wakuRelay.defaultTopics = concat(@[DefaultPubsubTopic], topics)
|
||||
|
||||
## Add peer exchange handler
|
||||
if peerExchangeHandler.isSome():
|
||||
|
||||
@ -30,7 +30,7 @@ const Defaultstring = "/waku/2/default-waku/proto"
|
||||
|
||||
### Client, filter subscripton manager
|
||||
|
||||
type FilterPushHandler* = proc(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.}
|
||||
type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.}
|
||||
|
||||
|
||||
## Subscription manager
|
||||
@ -44,16 +44,16 @@ proc init(T: type SubscriptionManager): T =
|
||||
proc clear(m: var SubscriptionManager) =
|
||||
m.subscriptions.clear()
|
||||
|
||||
proc registerSubscription(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic, handler: FilterPushHandler) =
|
||||
proc registerSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) =
|
||||
try:
|
||||
m.subscriptions[(pubsubTopic, contentTopic)]= handler
|
||||
except:
|
||||
error "failed to register filter subscription", error=getCurrentExceptionMsg()
|
||||
|
||||
proc removeSubscription(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic) =
|
||||
proc removeSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) =
|
||||
m.subscriptions.del((pubsubTopic, contentTopic))
|
||||
|
||||
proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic, message: WakuMessage) =
|
||||
proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) =
|
||||
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
|
||||
return
|
||||
|
||||
@ -139,7 +139,7 @@ proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeer
|
||||
return ok()
|
||||
|
||||
proc sendFilterRequestRpc(wf: WakuFilterClient,
|
||||
pubsubTopic: string,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopics: seq[ContentTopic],
|
||||
subscribe: bool,
|
||||
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
|
||||
@ -165,7 +165,7 @@ proc sendFilterRequestRpc(wf: WakuFilterClient,
|
||||
|
||||
|
||||
proc subscribe*(wf: WakuFilterClient,
|
||||
pubsubTopic: string,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopic: ContentTopic|seq[ContentTopic],
|
||||
handler: FilterPushHandler,
|
||||
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
|
||||
@ -185,7 +185,7 @@ proc subscribe*(wf: WakuFilterClient,
|
||||
return ok()
|
||||
|
||||
proc unsubscribe*(wf: WakuFilterClient,
|
||||
pubsubTopic: string,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopic: ContentTopic|seq[ContentTopic],
|
||||
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
|
||||
var topics: seq[ContentTopic]
|
||||
|
||||
@ -33,11 +33,11 @@ type WakuFilterResult*[T] = Result[T, string]
|
||||
type Subscription = object
|
||||
requestId: string
|
||||
peer: PeerID
|
||||
pubsubTopic: string
|
||||
pubsubTopic: PubsubTopic
|
||||
contentTopics: HashSet[ContentTopic]
|
||||
|
||||
|
||||
proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: string, contentTopics: seq[ContentTopic]) =
|
||||
proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]) =
|
||||
let subscription = Subscription(
|
||||
requestId: requestId,
|
||||
peer: peer,
|
||||
@ -165,7 +165,7 @@ proc handleClientError(wf: WakuFilter, subs: seq[Subscription]) {.raises: [Defec
|
||||
wf.subscriptions.delete(index)
|
||||
|
||||
|
||||
proc handleMessage*(wf: WakuFilter, pubsubTopic: string, msg: WakuMessage) {.async.} =
|
||||
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
|
||||
|
||||
trace "handling message", pubsubTopic, contentTopic=msg.contentTopic, subscriptions=wf.subscriptions.len
|
||||
|
||||
|
||||
@ -55,7 +55,7 @@ proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
if ?pb.getField(1, subflag):
|
||||
rpc.subscribe = bool(subflag)
|
||||
|
||||
var pubSubTopic: string
|
||||
var pubSubTopic: PubsubTopic
|
||||
discard ?pb.getField(2, pubSubTopic)
|
||||
rpc.pubSubTopic = pubSubTopic
|
||||
|
||||
|
||||
@ -66,6 +66,6 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
|
||||
|
||||
return ok()
|
||||
|
||||
proc publish*(wl: WakuLightPushClient, pubsubTopic: string, message: WakuMessage, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
proc publish*(wl: WakuLightPushClient, pubsubTopic: PubsubTopic, message: WakuMessage, peer: PeerId|RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||
let pushRequest = PushRequest(pubsubTopic: pubsubTopic, message: message)
|
||||
return await wl.sendPushRequest(pushRequest, peer)
|
||||
|
||||
@ -28,7 +28,7 @@ const WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
|
||||
type
|
||||
WakuLightPushResult*[T] = Result[T, string]
|
||||
|
||||
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: string, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.}
|
||||
PushMessageHandler* = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.gcsafe, closure.}
|
||||
|
||||
WakuLightPush* = ref object of LPProtocol
|
||||
rng*: ref rand.HmacDrbgContext
|
||||
|
||||
@ -28,7 +28,7 @@ proc decode*(T: type PushRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
var rpc = PushRequest()
|
||||
|
||||
var pubSubTopic: string
|
||||
var pubSubTopic: PubsubTopic
|
||||
discard ?pb.getField(1, pubSubTopic)
|
||||
rpc.pubSubTopic = pubSubTopic
|
||||
|
||||
|
||||
@ -26,6 +26,10 @@ type
|
||||
PubsubTopic* = string
|
||||
ContentTopic* = string
|
||||
|
||||
const
|
||||
DefaultPubsubTopic*: PubsubTopic = "/waku/2/default-waku/proto"
|
||||
DefaultContentTopic*: ContentTopic = "/waku/2/default-content/proto"
|
||||
|
||||
|
||||
type WakuMessage* = object
|
||||
payload*: seq[byte]
|
||||
|
||||
@ -14,6 +14,8 @@ import
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/stream/connection
|
||||
import
|
||||
./waku_message
|
||||
|
||||
logScope:
|
||||
topics = "waku relay"
|
||||
@ -23,7 +25,7 @@ const
|
||||
|
||||
type
|
||||
WakuRelay* = ref object of GossipSub
|
||||
defaultTopics*: seq[string] # Default configured PubSub topics
|
||||
defaultTopics*: seq[PubsubTopic] # Default configured PubSub topics
|
||||
|
||||
method init*(w: WakuRelay) =
|
||||
debug "init WakuRelay"
|
||||
@ -62,14 +64,14 @@ method initPubSub*(w: WakuRelay) {.raises: [Defect, InitializationError].} =
|
||||
w.init()
|
||||
|
||||
method subscribe*(w: WakuRelay,
|
||||
pubSubTopic: string,
|
||||
pubSubTopic: PubsubTopic,
|
||||
handler: TopicHandler) =
|
||||
debug "subscribe", pubSubTopic=pubSubTopic
|
||||
|
||||
procCall GossipSub(w).subscribe(pubSubTopic, handler)
|
||||
|
||||
method publish*(w: WakuRelay,
|
||||
pubSubTopic: string,
|
||||
pubSubTopic: PubsubTopic,
|
||||
message: seq[byte]
|
||||
): Future[int] {.async.} =
|
||||
trace "publish", pubSubTopic=pubSubTopic, message=message
|
||||
@ -83,7 +85,7 @@ method unsubscribe*(w: WakuRelay,
|
||||
procCall GossipSub(w).unsubscribe(topics)
|
||||
|
||||
method unsubscribeAll*(w: WakuRelay,
|
||||
pubSubTopic: string) =
|
||||
pubSubTopic: PubsubTopic) =
|
||||
debug "unsubscribeAll"
|
||||
|
||||
procCall GossipSub(w).unsubscribeAll(pubSubTopic)
|
||||
|
||||
@ -1053,7 +1053,7 @@ proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} =
|
||||
handler = handler)
|
||||
|
||||
|
||||
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) =
|
||||
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) =
|
||||
## this procedure is a thin wrapper for the pubsub addValidator method
|
||||
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
|
||||
## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic
|
||||
@ -1104,7 +1104,7 @@ proc mountRlnRelayStatic*(node: WakuNode,
|
||||
group: seq[IDCommitment],
|
||||
memKeyPair: MembershipKeyPair,
|
||||
memIndex: MembershipIndex,
|
||||
pubsubTopic: string,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopic: ContentTopic,
|
||||
spamHandler: Option[SpamHandler] = none(SpamHandler)): RlnRelayResult[void] =
|
||||
# Returns RlnRelayResult[void] to indicate the success of the call
|
||||
@ -1160,7 +1160,7 @@ proc mountRlnRelayDynamic*(node: WakuNode,
|
||||
memContractAddr: web3.Address,
|
||||
memKeyPair: Option[MembershipKeyPair] = none(MembershipKeyPair),
|
||||
memIndex: Option[MembershipIndex] = none(MembershipIndex),
|
||||
pubsubTopic: string,
|
||||
pubsubTopic: PubsubTopic,
|
||||
contentTopic: ContentTopic,
|
||||
spamHandler: Option[SpamHandler] = none(SpamHandler),
|
||||
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)) : Future[RlnRelayResult[void]] {.async.} =
|
||||
|
||||
@ -122,7 +122,7 @@ const StoreResumeTimeWindowOffset: Timestamp = getNanosecondTime(20) ## Adjust
|
||||
proc resume*(w: WakuStoreClient,
|
||||
peerList = none(seq[RemotePeerInfo]),
|
||||
pageSize = DefaultPageSize,
|
||||
pubsubTopic = DefaultTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
||||
pubsubTopic = DefaultPubsubTopic): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||
## messages are stored in the store node's messages field and in the message db
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
|
||||
@ -24,9 +24,9 @@ type
|
||||
|
||||
|
||||
# MessageStore interface
|
||||
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
|
||||
method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
|
||||
|
||||
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] {.base.} =
|
||||
method put*(ms: MessageStore, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] {.base.} =
|
||||
let
|
||||
digest = computeDigest(message)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
|
||||
@ -37,7 +37,7 @@ proc computeDigest*(msg: WakuMessage): MessageDigest =
|
||||
# Computes the hash
|
||||
return ctx.finish()
|
||||
|
||||
proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T =
|
||||
proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
|
||||
## Takes a WakuMessage with received timestamp and returns its Index.
|
||||
let
|
||||
digest = computeDigest(msg)
|
||||
|
||||
@ -37,8 +37,6 @@ logScope:
|
||||
const
|
||||
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
|
||||
|
||||
DefaultTopic* = "/waku/2/default-waku/proto"
|
||||
|
||||
MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
|
||||
|
||||
|
||||
@ -230,7 +228,7 @@ proc isValidMessage(msg: WakuMessage): bool =
|
||||
|
||||
return lowerBound <= msg.timestamp and msg.timestamp <= upperBound
|
||||
|
||||
proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) =
|
||||
proc handleMessage*(w: WakuStore, pubsubTopic: PubsubTopic, msg: WakuMessage) =
|
||||
if w.store.isNil():
|
||||
# Messages should not be stored
|
||||
return
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user