diff --git a/README.md b/README.md index e37179f..794cc64 100644 --- a/README.md +++ b/README.md @@ -6,12 +6,19 @@ > A pubsub system with flooding and gossiping variants. -PubSub is a work in progress, with floodsub as an initial protocol, followed by gossipsub ([spec](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub), [gossipsub.go](https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub.go)). +This is the canonical pubsub implementation for libp2p. + +We currently provide three implementations: +- floodsub, which is the baseline flooding protocol. +- gossipsub, which is a more advanced router with mesh formation and gossip propagation. + See [spec](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub) and [implementation](https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub.go) for more details. +- randomsub, which is a simple probabilistic router that propagates to random subsets of peers. ## Table of Contents - [Install](#install) - [Usage](#usage) +- [Documentation](#documentation) - [Contribute](#contribute) - [License](#license) @@ -29,6 +36,12 @@ To be used for messaging in p2p instrastructure (as part of libp2p) such as IPFS See [libp2p/specs/pubsub#Implementations](https://github.com/libp2p/specs/tree/master/pubsub#Implementations). +## Documentation + +See the [libp2p specs](https://github.com/libp2p/specs/tree/master/pubsub) for high level documentation +and [godoc](https://godoc.org/github.com/libp2p/go-libp2p-pubsub) for API documentation. + + ## Contribute Contributions welcome. Please check out [the issues](https://github.com/libp2p/go-libp2p-pubsub/issues). diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..fa0d7e1 --- /dev/null +++ b/doc.go @@ -0,0 +1,25 @@ +package pubsub + +// The pubsub package provides facilities for the Publish/Subscribe pattern of message +// propagation, also known as overlay multicast. +// The implementation provides topic-based pubsub, with pluggable routing algorithms. +// +// The main interface to the library is the `PubSub` object. +// You can construct this object with the following constructors +// - `NewFloodSub` creates an instance that uses the floodsub routing algorithm. +// - `NewGossipSub` creates an instance that uses the gossipsub routing algorithm. +// - `NewRandomSub` creates an instance that uses the randomsub routing algorithm. +// +// In addition, there is a generic constructor that creates a pubsub instance with +// a custom PubSubRouter interface. This procedure is currently reserved for internal +// use within the package. +// +// Once you have constructed a `PubSub` instance, you need to establish some connections +// to your peers; the implementation relies on ambient peer discovery, leaving bootstrap +// and active peer discovery up to the client. +// +// To publish a message to some topic, use `Publish`; you don't need to be subscribed +// to the topic in order to publish. +// +// To subscribe to a topic, use `Subscribe`; this will give you a subscription interface +// from which new messages can be pumped. diff --git a/floodsub.go b/floodsub.go index 6c8d3ae..ff48d95 100644 --- a/floodsub.go +++ b/floodsub.go @@ -14,7 +14,7 @@ const ( FloodSubID = protocol.ID("/floodsub/1.0.0") ) -// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps +// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps. func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) { rt := &FloodSubRouter{ protocols: ps, @@ -22,7 +22,7 @@ func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID return NewPubSub(ctx, h, rt, opts...) } -// NewFloodSub returns a new PubSub object using the FloodSubRouter +// NewFloodSub returns a new PubSub object using the FloodSubRouter. func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...) } diff --git a/gossipsub.go b/gossipsub.go index 6f45333..81e7f61 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -35,7 +35,7 @@ var ( GossipSubFanoutTTL = 60 * time.Second ) -// NewGossipSub returns a new PubSub object using GossipSubRouter as the router +// NewGossipSub returns a new PubSub object using GossipSubRouter as the router. func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { rt := &GossipSubRouter{ peers: make(map[peer.ID]protocol.ID), diff --git a/pubsub.go b/pubsub.go index 1f396e8..ebb2dcc 100644 --- a/pubsub.go +++ b/pubsub.go @@ -27,6 +27,7 @@ const ( var log = logging.Logger("pubsub") +// PubSub is the implementation of the pubsub system. type PubSub struct { // atomic counter for seqnos // NOTE: Must be declared at the top of the struct as we perform atomic @@ -106,7 +107,7 @@ type PubSub struct { ctx context.Context } -// PubSubRouter is the message router component of PubSub +// PubSubRouter is the message router component of PubSub. type PubSubRouter interface { // Protocols returns the list of protocols supported by the router. Protocols() []protocol.ID @@ -147,7 +148,7 @@ type RPC struct { type Option func(*PubSub) error -// NewPubSub returns a new PubSub management object +// NewPubSub returns a new PubSub management object. func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) { ps := &PubSub{ host: h, @@ -758,14 +759,14 @@ type topicReq struct { resp chan []string } -// GetTopics returns the topics this node is subscribed to +// GetTopics returns the topics this node is subscribed to. func (p *PubSub) GetTopics() []string { out := make(chan []string, 1) p.getTopics <- &topicReq{resp: out} return <-out } -// Publish publishes data under the given topic +// Publish publishes data to the given topic. func (p *PubSub) Publish(topic string, data []byte) error { seqno := p.nextSeqno() m := &pb.Message{ @@ -804,7 +805,7 @@ type sendReq struct { msg *Message } -// ListPeers returns a list of peers we are connected to. +// ListPeers returns a list of peers we are connected to in the given topic. func (p *PubSub) ListPeers(topic string) []peer.ID { out := make(chan []peer.ID) p.getPeers <- &listPeerReq{ @@ -835,13 +836,13 @@ type topicVal struct { validateThrottle chan struct{} } -// Validator is a function that validates a message +// Validator is a function that validates a message. type Validator func(context.Context, *Message) bool -// ValidatorOpt is an option for RegisterTopicValidator +// ValidatorOpt is an option for RegisterTopicValidator. type ValidatorOpt func(addVal *addValReq) error -// WithValidatorTimeout is an option that sets the topic validator timeout +// WithValidatorTimeout is an option that sets the topic validator timeout. func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { return func(addVal *addValReq) error { addVal.timeout = timeout @@ -849,7 +850,7 @@ func WithValidatorTimeout(timeout time.Duration) ValidatorOpt { } } -// WithValidatorConcurrency is an option that sets topic validator throttle +// WithValidatorConcurrency is an option that sets topic validator throttle. func WithValidatorConcurrency(n int) ValidatorOpt { return func(addVal *addValReq) error { addVal.throttle = n @@ -857,7 +858,7 @@ func WithValidatorConcurrency(n int) ValidatorOpt { } } -// RegisterTopicValidator registers a validator for topic +// RegisterTopicValidator registers a validator for topic. func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error { addVal := &addValReq{ topic: topic, @@ -904,8 +905,8 @@ func (ps *PubSub) addValidator(req *addValReq) { req.resp <- nil } -// UnregisterTopicValidator removes a validator from a topic -// returns an error if there was no validator registered with the topic +// UnregisterTopicValidator removes a validator from a topic. +// Returns an error if there was no validator registered with the topic. func (p *PubSub) UnregisterTopicValidator(topic string) error { rmVal := &rmValReq{ topic: topic, diff --git a/randomsub.go b/randomsub.go index b4ccbaf..adb85ad 100644 --- a/randomsub.go +++ b/randomsub.go @@ -18,7 +18,7 @@ var ( RandomSubD = 6 ) -// NewRandomSub returns a new PubSub object using RandomSubRouter as the router +// NewRandomSub returns a new PubSub object using RandomSubRouter as the router. func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { rt := &RandomSubRouter{ peers: make(map[peer.ID]protocol.ID),