Base interface for pubsub protocols
You can subscribe to a topic, publish something on it, and eventually unsubscribe from it.
Types
InitializationError = object of LPError
- Source Edit
libp2p_pubsub_broadcast_graft = IgnoredCollector
- Source Edit
libp2p_pubsub_broadcast_ihave = IgnoredCollector
- Source Edit
libp2p_pubsub_broadcast_iwant = IgnoredCollector
- Source Edit
libp2p_pubsub_broadcast_messages = IgnoredCollector
- Source Edit
libp2p_pubsub_broadcast_prune = IgnoredCollector
- Source Edit
libp2p_pubsub_broadcast_subscriptions = IgnoredCollector
- Source Edit
libp2p_pubsub_broadcast_unsubscriptions = IgnoredCollector
- Source Edit
libp2p_pubsub_messages_published = IgnoredCollector
- Source Edit
libp2p_pubsub_messages_rebroadcasted = IgnoredCollector
- Source Edit
libp2p_pubsub_received_graft = IgnoredCollector
- Source Edit
libp2p_pubsub_received_ihave = IgnoredCollector
- Source Edit
libp2p_pubsub_received_iwant = IgnoredCollector
- Source Edit
libp2p_pubsub_received_messages = IgnoredCollector
- Source Edit
libp2p_pubsub_received_prune = IgnoredCollector
- Source Edit
libp2p_pubsub_received_subscriptions = IgnoredCollector
- Source Edit
libp2p_pubsub_received_unsubscriptions = IgnoredCollector
- Source Edit
MsgIdProvider {.public.} = proc (m: Message): Result[MessageId, ValidationResult] {. noSideEffect, ...raises: [], gcsafe.}
- Source Edit
PubSub {.public.} = ref object of LPProtocol switch*: Switch peerInfo*: PeerInfo topics*: Table[string, seq[TopicHandler]] peers*: Table[PeerId, PubSubPeer] triggerSelf*: bool ## trigger own local handler on publish verifySignature*: bool ## enable signature verification sign*: bool ## enable message signing validators*: Table[string, HashSet[ValidatorHandler]] observers: ref seq[PubSubObserver] msgIdProvider*: MsgIdProvider ## Turn message into message id (not nil) msgSeqno*: uint64 anonymize*: bool ## if we omit fromPeer and seqno from RPC messages we send subscriptionValidator*: SubscriptionValidator topicsHigh*: int ## the maximum number of topics a peer is allowed to subscribe to maxMessageSize*: int ## \ ## the maximum raw message size we'll globally allow ## for finer tuning, check message size on topic validator ## ## sending a big message to a peer with a lower size limit can ## lead to issues, from descoring to connection drops ## ## defaults to 1mB rng*: ref HmacDrbgContext knownTopics*: HashSet[string]
- Source Edit
SubscriptionValidator {.public.} = proc (topic: string): bool {....raises: [], gcsafe.}
- Every time a peer send us a subscription (even to an unknown topic), we have to store it, which may be an attack vector. This callback can be used to reject topic we're not interested in Source Edit
TopicHandler {.public.} = proc (topic: string; data: seq[byte]): Future[void] {. ...gcsafe, raises: [].}
- Source Edit
TopicPair = tuple[topic: string, handler: TopicHandler]
- Source Edit
ValidatorHandler {.public.} = proc (topic: string; message: Message): Future[ ValidationResult] {....gcsafe, raises: [].}
- Source Edit
Consts
KnownLibP2PTopics = ""
- Source Edit
KnownLibP2PTopicsSeq = [""]
- Source Edit
Procs
proc addObserver(p: PubSub; observer: PubSubObserver) {.public, ...raises: [], tags: [].}
- Source Edit
proc broadcast(p: PubSub; sendPeers: auto; msg: RPCMsg; isHighPriority: bool) {. ...raises: [], raises: [].}
-
This procedure attempts to send a msg (of type RPCMsg) to a specified group of peers in the PubSub network.
Parameters:
- p: The PubSub instance.
- sendPeers: An iterable of PubSubPeer instances representing the peers to whom the message should be sent.
- msg: The RPCMsg instance that contains the message to be broadcast.
- isHighPriority: A boolean indicating whether the message should be treated as high priority.
High priority messages are sent immediately, while low priority messages are queued and sent only after all high priority messages have been sent.
Source Edit proc handleData(p: PubSub; topic: string; data: seq[byte]): Future[void] {. ...raises: [], tags: [RootEffect].}
- Source Edit
proc init[PubParams: object | bool](P: typedesc[PubSub]; switch: Switch; triggerSelf: bool = false; anonymize: bool = false; verifySignature: bool = true; sign: bool = true; msgIdProvider: MsgIdProvider = defaultMsgIdProvider; subscriptionValidator: SubscriptionValidator = nil; maxMessageSize: int = 1024 * 1024; rng: ref HmacDrbgContext = newRng(); parameters: PubParams = false): P:type {. ...raises: [InitializationError], public, ...raises: [].}
- Source Edit
proc removeObserver(p: PubSub; observer: PubSubObserver) {.public, ...raises: [], tags: [].}
- Source Edit
proc send(p: PubSub; peer: PubSubPeer; msg: RPCMsg; isHighPriority: bool) {. ...raises: [], raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
-
This procedure attempts to send a msg (of type RPCMsg) to the specified remote peer in the PubSub network.
Parameters:
- p: The PubSub instance.
- peer: An instance of PubSubPeer representing the peer to whom the message should be sent.
- msg: The RPCMsg instance that contains the message to be sent.
- isHighPriority: A boolean indicating whether the message should be treated as high priority.
High priority messages are sent immediately, while low priority messages are queued and sent only after all high priority messages have been sent.
Source Edit proc sendSubs(p: PubSub; peer: PubSubPeer; topics: openArray[string]; subscribe: bool) {....raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
- send subscriptions to remote peer Source Edit
proc subscribe(p: PubSub; topic: string; handler: TopicHandler) {.public, ...raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
-
subscribe to a topic
topic - a string topic to subscribe to
- handler - user provided proc that
- will be triggered on every received message
proc unsubscribe(p: PubSub; topic: string; handler: TopicHandler) {.public, ...raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
- unsubscribe from a topic string Source Edit
proc unsubscribe(p: PubSub; topics: openArray[TopicPair]) {.public, ...raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
- unsubscribe from a list of topic handlers Source Edit
proc unsubscribeAll(p: PubSub; topic: string) {.public, ...gcsafe, raises: [], tags: [WriteIOEffect, TimeEffect, RootEffect].}
- unsubscribe every handler from topic Source Edit
proc updateMetrics(p: PubSub; rpcMsg: RPCMsg) {....raises: [], tags: [].}
- Source Edit
Methods
method addValidator(p: PubSub; topic: varargs[string]; hook: ValidatorHandler) {. base, public, ...gcsafe, raises: [], tags: [RootEffect].}
- Add a validator to a topic. Each new message received in this will be sent to hook. hook can return either Accept, Ignore or Reject (which can descore the peer) Source Edit
method getOrCreatePeer(p: PubSub; peerId: PeerId; protos: seq[string]): PubSubPeer {. base, ...gcsafe, raises: [], tags: [WriteIOEffect, TimeEffect, RootEffect].}
- Source Edit
method handleConn(p: PubSub; conn: Connection; proto: string): InternalRaisesFuture[ void, (CancelledError,)] {.base, stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
handle incoming connections
this proc will:
- register a new PubSubPeer for the connection
- register a handler with the peer; this handler gets called on every rpc message that the peer receives
- ask the peer to subscribe us to every topic that we're interested in
method initPubSub(p: PubSub) {.base, ...raises: [InitializationError], raises: [], tags: [].}
- perform pubsub initialization Source Edit
method onPubSubPeerEvent(p: PubSub; peer: PubSubPeer; event: PubSubPeerEvent) {. base, ...gcsafe, raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
- Source Edit
method onTopicSubscription(p: PubSub; topic: string; subscribed: bool) {.base, ...gcsafe, raises: [], tags: [RootEffect, WriteIOEffect, TimeEffect].}
- Source Edit
method publish(p: PubSub; topic: string; data: seq[byte]): Future[int] {.base, public(), stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
-
publish to a topic
The return value is the number of neighbours that we attempted to send the message to, excluding self. Note that this is an optimistic number of attempts - the number of peers that actually receive the message might be lower.
Source Edit method removeValidator(p: PubSub; topic: varargs[string]; hook: ValidatorHandler) {. base, public, ...raises: [], tags: [RootEffect].}
- Source Edit
method rpcHandler(p: PubSub; peer: PubSubPeer; data: seq[byte]): Future[void] {. base, stackTrace: false, ...raises: [], gcsafe, raises: [], raises: [], tags: [RootEffect].}
- Handler that must be overridden by concrete implementation Source Edit
method subscribePeer(p: PubSub; peer: PeerId) {.base, ...gcsafe, raises: [], tags: [WriteIOEffect, TimeEffect, RootEffect].}
- subscribe to remote peer to receive/send pubsub messages Source Edit
method unsubscribePeer(p: PubSub; peerId: PeerId) {.base, ...gcsafe, raises: [], tags: [WriteIOEffect, TimeEffect].}
- handle peer disconnects Source Edit