mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-26 11:29:28 +00:00
Initial add of api layers interface classes
This commit is contained in:
parent
9e58cfa139
commit
eb5b82de2b
195
logos_delivery/api/kernel_api.nim
Normal file
195
logos_delivery/api/kernel_api.nim
Normal file
@ -0,0 +1,195 @@
|
||||
import std/options
|
||||
import chronos, results
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/waku_core/topics/pubsub_topic
|
||||
import logos_delivery/waku/waku_store/common
|
||||
|
||||
type IKernel* = ref object of RootObj
|
||||
|
||||
# --- topic construction ---
|
||||
method buildContentTopic*(
|
||||
self: IKernel,
|
||||
appName: string,
|
||||
appVersion: uint32,
|
||||
name: string,
|
||||
encoding: string,
|
||||
): Future[Result[ContentTopic, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.buildContentTopic not implemented")
|
||||
|
||||
method buildPubsubTopic*(
|
||||
self: IKernel, topicName: string
|
||||
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.buildPubsubTopic not implemented")
|
||||
|
||||
method defaultPubsubTopic*(
|
||||
self: IKernel
|
||||
): Future[Result[PubsubTopic, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.defaultPubsubTopic not implemented")
|
||||
|
||||
# --- relay ---
|
||||
method relayPublish*(
|
||||
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[int, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.relayPublish not implemented")
|
||||
|
||||
method relaySubscribe*(
|
||||
self: IKernel, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.relaySubscribe not implemented")
|
||||
|
||||
method relayUnsubscribe*(
|
||||
self: IKernel, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.relayUnsubscribe not implemented")
|
||||
|
||||
method relayAddProtectedShard*(
|
||||
self: IKernel, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.relayAddProtectedShard not implemented")
|
||||
|
||||
method relayConnectedPeers*(
|
||||
self: IKernel, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.relayConnectedPeers not implemented")
|
||||
|
||||
method relayPeersInMesh*(
|
||||
self: IKernel, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.relayPeersInMesh not implemented")
|
||||
|
||||
# --- filter ---
|
||||
method filterSubscribe*(
|
||||
self: IKernel,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.filterSubscribe not implemented")
|
||||
|
||||
method filterUnsubscribe*(
|
||||
self: IKernel,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.filterUnsubscribe not implemented")
|
||||
|
||||
method filterUnsubscribeAll*(
|
||||
self: IKernel, peer: string
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.filterUnsubscribeAll not implemented")
|
||||
|
||||
# --- lightpush ---
|
||||
method lightpushPublish*(
|
||||
self: IKernel, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
|
||||
): Future[Result[string, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.lightpushPublish not implemented")
|
||||
|
||||
# --- store ---
|
||||
method storeQuery*(
|
||||
self: IKernel, request: StoreQueryRequest, peer: string, timeoutMs: int
|
||||
): Future[Result[StoreQueryResponse, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.storeQuery not implemented")
|
||||
|
||||
# --- peer management ---
|
||||
method connect*(
|
||||
self: IKernel, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.connect not implemented")
|
||||
|
||||
method disconnectPeerById*(
|
||||
self: IKernel, peerId: string
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.disconnectPeerById not implemented")
|
||||
|
||||
method disconnectAllPeers*(
|
||||
self: IKernel
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.disconnectAllPeers not implemented")
|
||||
|
||||
method dialPeer*(
|
||||
self: IKernel, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.dialPeer not implemented")
|
||||
|
||||
method dialPeerById*(
|
||||
self: IKernel, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.dialPeerById not implemented")
|
||||
|
||||
method peerIdsFromPeerstore*(
|
||||
self: IKernel
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.peerIdsFromPeerstore not implemented")
|
||||
|
||||
method connectedPeersInfo*(
|
||||
self: IKernel
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.connectedPeersInfo not implemented")
|
||||
|
||||
method connectedPeers*(
|
||||
self: IKernel
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.connectedPeers not implemented")
|
||||
|
||||
method peerIdsByProtocol*(
|
||||
self: IKernel, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.peerIdsByProtocol not implemented")
|
||||
|
||||
# --- discovery ---
|
||||
method dnsDiscovery*(
|
||||
self: IKernel, enrTreeUrl: string, nameServer: string, timeoutMs: int
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.dnsDiscovery not implemented")
|
||||
|
||||
method discv5UpdateBootnodes*(
|
||||
self: IKernel, bootnodes: seq[string]
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.discv5UpdateBootnodes not implemented")
|
||||
|
||||
method startDiscv5*(
|
||||
self: IKernel
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.startDiscv5 not implemented")
|
||||
|
||||
method stopDiscv5*(
|
||||
self: IKernel
|
||||
): Future[Result[bool, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.stopDiscv5 not implemented")
|
||||
|
||||
method peerExchangeRequest*(
|
||||
self: IKernel, numPeers: uint64
|
||||
): Future[Result[int, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.peerExchangeRequest not implemented")
|
||||
|
||||
# --- debug / info ---
|
||||
method version*(
|
||||
self: IKernel
|
||||
): Future[Result[string, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.version not implemented")
|
||||
|
||||
method listenAddresses*(
|
||||
self: IKernel
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.listenAddresses not implemented")
|
||||
|
||||
method myEnr*(
|
||||
self: IKernel
|
||||
): Future[Result[string, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.myEnr not implemented")
|
||||
|
||||
method myPeerId*(
|
||||
self: IKernel
|
||||
): Future[Result[string, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.myPeerId not implemented")
|
||||
|
||||
method metrics*(
|
||||
self: IKernel
|
||||
): Future[Result[string, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.metrics not implemented")
|
||||
|
||||
method pingPeer*(
|
||||
self: IKernel, peerAddr: string, timeoutMs: int
|
||||
): Future[Result[int64, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IKernel.pingPeer not implemented")
|
||||
19
logos_delivery/api/messaging_client_api.nim
Normal file
19
logos_delivery/api/messaging_client_api.nim
Normal file
@ -0,0 +1,19 @@
|
||||
import chronos, results
|
||||
import logos_delivery/api/types
|
||||
|
||||
type IMessagingClient* = ref object of RootObj
|
||||
|
||||
method subscribe*(
|
||||
self: IMessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IMessagingClient.subscribe not implemented")
|
||||
|
||||
method unsubscribe*(
|
||||
self: IMessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] {.base, raises: [].} =
|
||||
return err("Interface IMessagingClient.unsubscribe not implemented")
|
||||
|
||||
method send*(
|
||||
self: IMessagingClient, envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IMessagingClient.send not implemented")
|
||||
28
logos_delivery/api/reliable_cannel_manager_api.nim
Normal file
28
logos_delivery/api/reliable_cannel_manager_api.nim
Normal file
@ -0,0 +1,28 @@
|
||||
import chronos, results
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/channels/types
|
||||
import logos_delivery/channels/reliable_channel
|
||||
|
||||
type IReliableChannelManager* = ref object of RootObj
|
||||
|
||||
method createReliableChannel*(
|
||||
self: IReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
sendHandler: SendHandler = nil,
|
||||
): Result[ChannelId, string] {.base.} =
|
||||
return err("Interface IReliableChannelManager.createReliableChannel not implemented")
|
||||
|
||||
method closeChannel*(
|
||||
self: IReliableChannelManager, channelId: ChannelId
|
||||
): Future[Result[void, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IReliableChannelManager.closeChannel not implemented")
|
||||
|
||||
method send*(
|
||||
self: IReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
appPayload: seq[byte],
|
||||
ephemeral: bool = false,
|
||||
): Future[Result[RequestId, string]] {.async: (raises: []), base.} =
|
||||
return err("Interface IReliableChannelManager.send not implemented")
|
||||
@ -13,9 +13,10 @@ import stew/byteutils
|
||||
|
||||
import brokers/broker_context
|
||||
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/api/reliable_cannel_manager_api
|
||||
import logos_delivery/waku/events/message_events as waku_message_events
|
||||
import logos_delivery/messaging/messaging_client
|
||||
import logos_delivery/api/types
|
||||
import logos_delivery/waku/waku_core/topics
|
||||
import logos_delivery/waku/persistency/sds_persistency
|
||||
|
||||
@ -34,7 +35,7 @@ type
|
||||
## channel API. Placeholder for now (segmentation / SDS / rate-limit defaults
|
||||
## will move here in a follow-up PR); kept so each layer owns its own config.
|
||||
|
||||
ReliableChannelManager* = ref object
|
||||
ReliableChannelManager* = ref object of IReliableChannelManager
|
||||
channels: Table[ChannelId, ReliableChannel]
|
||||
messagingClient: MessagingClient ## The channel layer chains onto messaging.
|
||||
sendHandler: SendHandler
|
||||
@ -94,13 +95,13 @@ proc sdsPersistence(): Option[Persistence] =
|
||||
return none(Persistence)
|
||||
return some(newSdsPersistence(job))
|
||||
|
||||
proc createReliableChannel*(
|
||||
method createReliableChannel*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
sendHandler: SendHandler = nil,
|
||||
): Result[ChannelId, string] =
|
||||
): Result[ChannelId, string] {.raises: [].} =
|
||||
## Spec entry point. The `sendHandler` and `rng` the channel needs are
|
||||
## sourced from the owning `ReliableChannelManager` rather than passed
|
||||
## per call. Encryption is wired up through the `Encrypt`/`Decrypt`
|
||||
@ -146,7 +147,7 @@ proc createReliableChannel*(
|
||||
self.channels[channelId] = chn
|
||||
return ok(channelId)
|
||||
|
||||
proc closeChannel*(
|
||||
method closeChannel*(
|
||||
self: ReliableChannelManager, channelId: ChannelId
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
## Stops the channel's SDS loops and releases the channel. Persisted SDS
|
||||
@ -158,7 +159,7 @@ proc closeChannel*(
|
||||
await chn.stop()
|
||||
return ok()
|
||||
|
||||
proc send*(
|
||||
method send*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
appPayload: seq[byte],
|
||||
|
||||
@ -2,6 +2,7 @@ import results, chronos
|
||||
import chronicles
|
||||
import
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/api/messaging_client_api,
|
||||
logos_delivery/waku/node/[waku_node, subscription_manager],
|
||||
logos_delivery/messaging/delivery_service/[recv_service, send_service],
|
||||
logos_delivery/messaging/delivery_service/send_service/delivery_task
|
||||
@ -13,7 +14,7 @@ type
|
||||
## follow-up PR. Today it only carries the p2p reliability toggle.
|
||||
useP2PReliability*: bool
|
||||
|
||||
MessagingClient* = ref object
|
||||
MessagingClient* = ref object of IMessagingClient
|
||||
node: WakuNode
|
||||
sendService*: SendService
|
||||
recvService*: RecvService
|
||||
@ -43,29 +44,29 @@ proc stop*(self: MessagingClient) {.async.} =
|
||||
await self.recvService.stopRecvService()
|
||||
self.started = false
|
||||
|
||||
proc checkApiAvailability(mc: MessagingClient): Result[void, string] =
|
||||
if mc.isNil():
|
||||
proc checkApiAvailability(self: MessagingClient): Result[void, string] =
|
||||
if self.isNil():
|
||||
return err("MessagingClient is not initialized")
|
||||
|
||||
return ok()
|
||||
|
||||
proc subscribe*(
|
||||
mc: MessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
?checkApiAvailability(mc)
|
||||
method subscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return mc.node.subscriptionManager.subscribe(contentTopic)
|
||||
return self.node.subscriptionManager.subscribe(contentTopic)
|
||||
|
||||
proc unsubscribe*(
|
||||
mc: MessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] =
|
||||
?checkApiAvailability(mc)
|
||||
method unsubscribe*(
|
||||
self: MessagingClient, contentTopic: ContentTopic
|
||||
): Result[void, string] {.raises: [].} =
|
||||
?checkApiAvailability(self)
|
||||
|
||||
return mc.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
return self.node.subscriptionManager.unsubscribe(contentTopic)
|
||||
|
||||
proc send*(
|
||||
method send*(
|
||||
self: MessagingClient, envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async.} =
|
||||
): Future[Result[RequestId, string]] {.async: (raises: []).} =
|
||||
## High-level messaging API send. Auto-subscribes to the content topic
|
||||
## (so the local node sees its own gossipsub broadcast), builds a
|
||||
## `DeliveryTask`, and hands it to the send service. Returns the request
|
||||
|
||||
@ -22,6 +22,7 @@ import
|
||||
metrics/chronos_httpserver,
|
||||
brokers/broker_context,
|
||||
logos_delivery/api/types,
|
||||
logos_delivery/api/kernel_api,
|
||||
logos_delivery/waku/[
|
||||
waku_core,
|
||||
waku_node,
|
||||
@ -65,7 +66,7 @@ const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
const FilterOpTimeout = 5.seconds
|
||||
|
||||
type Waku* = ref object
|
||||
type Waku* = ref object of IKernel
|
||||
stateInfo*: WakuStateInfo
|
||||
conf*: WakuConf
|
||||
rng*: crypto.Rng
|
||||
@ -577,29 +578,31 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
## Kernel API realization
|
||||
##
|
||||
# --- topic construction ---
|
||||
proc buildContentTopic*(
|
||||
method buildContentTopic*(
|
||||
self: Waku, appName: string, appVersion: uint32, name: string, encoding: string
|
||||
): Future[Result[ContentTopic, string]] {.async.} =
|
||||
): Future[Result[ContentTopic, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok(ContentTopic(fmt"/{appName}/{appVersion}/{name}/{encoding}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc buildPubsubTopic*(
|
||||
method buildPubsubTopic*(
|
||||
self: Waku, topicName: string
|
||||
): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
): Future[Result[PubsubTopic, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok(PubsubTopic(fmt"/waku/2/{topicName}"))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc defaultPubsubTopic*(self: Waku): Future[Result[PubsubTopic, string]] {.async.} =
|
||||
method defaultPubsubTopic*(
|
||||
self: Waku
|
||||
): Future[Result[PubsubTopic, string]] {.async: (raises: []).} =
|
||||
return ok(DefaultPubsubTopic)
|
||||
|
||||
# --- relay ---
|
||||
proc relayPublish*(
|
||||
method relayPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, timeoutMs: uint32
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
): Future[Result[int, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPublish: WakuRelay not mounted")
|
||||
@ -611,9 +614,9 @@ proc relayPublish*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relaySubscribe*(
|
||||
method relaySubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relaySubscribe: WakuRelay not mounted")
|
||||
@ -627,9 +630,9 @@ proc relaySubscribe*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayUnsubscribe*(
|
||||
method relayUnsubscribe*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayUnsubscribe: WakuRelay not mounted")
|
||||
@ -641,9 +644,9 @@ proc relayUnsubscribe*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayAddProtectedShard*(
|
||||
method relayAddProtectedShard*(
|
||||
self: Waku, clusterId: uint16, shardId: uint16, publicKey: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayAddProtectedShard: WakuRelay not mounted")
|
||||
@ -657,9 +660,9 @@ proc relayAddProtectedShard*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayConnectedPeers*(
|
||||
method relayConnectedPeers*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayConnectedPeers: WakuRelay not mounted")
|
||||
@ -671,9 +674,9 @@ proc relayConnectedPeers*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc relayPeersInMesh*(
|
||||
method relayPeersInMesh*(
|
||||
self: Waku, pubsubTopic: PubsubTopic
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuRelay.isNil():
|
||||
return err("relayPeersInMesh: WakuRelay not mounted")
|
||||
@ -686,12 +689,12 @@ proc relayPeersInMesh*(
|
||||
return err(e.msg)
|
||||
|
||||
# --- filter ---
|
||||
proc filterSubscribe*(
|
||||
method filterSubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
@ -706,12 +709,12 @@ proc filterSubscribe*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribe*(
|
||||
method filterUnsubscribe*(
|
||||
self: Waku,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic],
|
||||
peer: string,
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
@ -726,9 +729,9 @@ proc filterUnsubscribe*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc filterUnsubscribeAll*(
|
||||
method filterUnsubscribeAll*(
|
||||
self: Waku, peer: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuFilterClient.isNil():
|
||||
return err("wakuFilterClient is not mounted")
|
||||
@ -744,9 +747,9 @@ proc filterUnsubscribeAll*(
|
||||
return err(e.msg)
|
||||
|
||||
# --- lightpush ---
|
||||
proc lightpushPublish*(
|
||||
method lightpushPublish*(
|
||||
self: Waku, pubsubTopic: PubsubTopic, message: WakuMessage, peer: string
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuLegacyLightpushClient.isNil():
|
||||
return err("wakuLegacyLightpushClient is not mounted")
|
||||
@ -766,9 +769,9 @@ proc lightpushPublish*(
|
||||
return err(e.msg)
|
||||
|
||||
# --- store ---
|
||||
proc storeQuery*(
|
||||
method storeQuery*(
|
||||
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int
|
||||
): Future[Result[StoreQueryResponse, string]] {.async.} =
|
||||
): Future[Result[StoreQueryResponse, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.node.wakuStoreClient.isNil():
|
||||
return err("wakuStoreClient is not mounted")
|
||||
@ -788,18 +791,18 @@ proc storeQuery*(
|
||||
return err(e.msg)
|
||||
|
||||
# --- peer management ---
|
||||
proc connect*(
|
||||
method connect*(
|
||||
self: Waku, peers: seq[string], timeoutMs: uint32
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
await self.node.connectToNodes(peers.mapIt(strip(it)), source = "static")
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectPeerById*(
|
||||
method disconnectPeerById*(
|
||||
self: Waku, peerId: string
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
@ -808,16 +811,18 @@ proc disconnectPeerById*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc disconnectAllPeers*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
method disconnectAllPeers*(
|
||||
self: Waku
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
await self.node.peerManager.disconnectAllPeers()
|
||||
return ok(true)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeer*(
|
||||
method dialPeer*(
|
||||
self: Waku, peerAddr: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
let remotePeerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err($error)
|
||||
@ -828,9 +833,9 @@ proc dialPeer*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc dialPeerById*(
|
||||
method dialPeerById*(
|
||||
self: Waku, peerId: string, protocol: string, timeoutMs: int
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
let pId = PeerId.init(peerId).valueOr:
|
||||
return err($error)
|
||||
@ -841,13 +846,17 @@ proc dialPeerById*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsFromPeerstore*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
method peerIdsFromPeerstore*(
|
||||
self: Waku
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok(self.node.peerManager.switch.peerStore.peers().mapIt($it.peerId))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
method connectedPeersInfo*(
|
||||
self: Waku
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
@ -858,16 +867,18 @@ proc connectedPeersInfo*(self: Waku): Future[Result[seq[string], string]] {.asyn
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc connectedPeers*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
method connectedPeers*(
|
||||
self: Waku
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
let (inPeerIds, outPeerIds) = self.node.peerManager.connectedPeers()
|
||||
return ok(concat(inPeerIds, outPeerIds).mapIt($it))
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerIdsByProtocol*(
|
||||
method peerIdsByProtocol*(
|
||||
self: Waku, protocol: string
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok(
|
||||
self.node.peerManager.switch.peerStore
|
||||
@ -879,9 +890,9 @@ proc peerIdsByProtocol*(
|
||||
return err(e.msg)
|
||||
|
||||
# --- discovery ---
|
||||
proc dnsDiscovery*(
|
||||
method dnsDiscovery*(
|
||||
self: Waku, enrTreeUrl: string, nameServer: string, timeoutMs: int
|
||||
): Future[Result[seq[string], string]] {.async.} =
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
let dnsNameServers = @[parseIpAddress(nameServer)]
|
||||
let discoveredPeers = (
|
||||
@ -898,9 +909,9 @@ proc dnsDiscovery*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc discv5UpdateBootnodes*(
|
||||
method discv5UpdateBootnodes*(
|
||||
self: Waku, bootnodes: seq[string]
|
||||
): Future[Result[bool, string]] {.async.} =
|
||||
): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
@ -911,7 +922,7 @@ proc discv5UpdateBootnodes*(
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
method startDiscv5*(self: Waku): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
@ -921,7 +932,7 @@ proc startDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
method stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
if self.wakuDiscv5.isNil():
|
||||
return err("discv5 not started")
|
||||
@ -930,9 +941,9 @@ proc stopDiscv5*(self: Waku): Future[Result[bool, string]] {.async.} =
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc peerExchangeRequest*(
|
||||
method peerExchangeRequest*(
|
||||
self: Waku, numPeers: uint64
|
||||
): Future[Result[int, string]] {.async.} =
|
||||
): Future[Result[int, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
let numPeersRecv = (await self.node.fetchPeerExchangePeers(numPeers)).valueOr:
|
||||
return err("failed peer exchange: " & $error)
|
||||
@ -941,37 +952,39 @@ proc peerExchangeRequest*(
|
||||
return err(e.msg)
|
||||
|
||||
# --- debug / info ---
|
||||
proc version*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
method version*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
return ok(WakuNodeVersionString)
|
||||
|
||||
proc listenAddresses*(self: Waku): Future[Result[seq[string], string]] {.async.} =
|
||||
method listenAddresses*(
|
||||
self: Waku
|
||||
): Future[Result[seq[string], string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok(self.node.info().listenAddresses)
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myEnr*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
method myEnr*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok(self.node.enr.toURI())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc myPeerId*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
method myPeerId*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
return ok($self.node.peerId())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc metrics*(self: Waku): Future[Result[string, string]] {.async.} =
|
||||
method metrics*(self: Waku): Future[Result[string, string]] {.async: (raises: []).} =
|
||||
{.gcsafe.}:
|
||||
try:
|
||||
return ok(defaultRegistry.toText())
|
||||
except CatchableError as e:
|
||||
return err(e.msg)
|
||||
|
||||
proc pingPeer*(
|
||||
method pingPeer*(
|
||||
self: Waku, peerAddr: string, timeoutMs: int
|
||||
): Future[Result[int64, string]] {.async.} =
|
||||
): Future[Result[int64, string]] {.async: (raises: []).} =
|
||||
try:
|
||||
let peerInfo = parsePeerInfo(peerAddr).valueOr:
|
||||
return err("pingPeer failed to parse peer addr: " & $error)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user