commit
51b7501433
15
README.md
15
README.md
|
@ -6,12 +6,19 @@
|
|||
|
||||
> A pubsub system with flooding and gossiping variants.
|
||||
|
||||
PubSub is a work in progress, with floodsub as an initial protocol, followed by gossipsub ([spec](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub), [gossipsub.go](https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub.go)).
|
||||
This is the canonical pubsub implementation for libp2p.
|
||||
|
||||
We currently provide three implementations:
|
||||
- floodsub, which is the baseline flooding protocol.
|
||||
- gossipsub, which is a more advanced router with mesh formation and gossip propagation.
|
||||
See [spec](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub) and [implementation](https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub.go) for more details.
|
||||
- randomsub, which is a simple probabilistic router that propagates to random subsets of peers.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [Install](#install)
|
||||
- [Usage](#usage)
|
||||
- [Documentation](#documentation)
|
||||
- [Contribute](#contribute)
|
||||
- [License](#license)
|
||||
|
||||
|
@ -29,6 +36,12 @@ To be used for messaging in p2p instrastructure (as part of libp2p) such as IPFS
|
|||
|
||||
See [libp2p/specs/pubsub#Implementations](https://github.com/libp2p/specs/tree/master/pubsub#Implementations).
|
||||
|
||||
## Documentation
|
||||
|
||||
See the [libp2p specs](https://github.com/libp2p/specs/tree/master/pubsub) for high level documentation
|
||||
and [godoc](https://godoc.org/github.com/libp2p/go-libp2p-pubsub) for API documentation.
|
||||
|
||||
|
||||
## Contribute
|
||||
|
||||
Contributions welcome. Please check out [the issues](https://github.com/libp2p/go-libp2p-pubsub/issues).
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package pubsub
|
||||
|
||||
// The pubsub package provides facilities for the Publish/Subscribe pattern of message
|
||||
// propagation, also known as overlay multicast.
|
||||
// The implementation provides topic-based pubsub, with pluggable routing algorithms.
|
||||
//
|
||||
// The main interface to the library is the `PubSub` object.
|
||||
// You can construct this object with the following constructors
|
||||
// - `NewFloodSub` creates an instance that uses the floodsub routing algorithm.
|
||||
// - `NewGossipSub` creates an instance that uses the gossipsub routing algorithm.
|
||||
// - `NewRandomSub` creates an instance that uses the randomsub routing algorithm.
|
||||
//
|
||||
// In addition, there is a generic constructor that creates a pubsub instance with
|
||||
// a custom PubSubRouter interface. This procedure is currently reserved for internal
|
||||
// use within the package.
|
||||
//
|
||||
// Once you have constructed a `PubSub` instance, you need to establish some connections
|
||||
// to your peers; the implementation relies on ambient peer discovery, leaving bootstrap
|
||||
// and active peer discovery up to the client.
|
||||
//
|
||||
// To publish a message to some topic, use `Publish`; you don't need to be subscribed
|
||||
// to the topic in order to publish.
|
||||
//
|
||||
// To subscribe to a topic, use `Subscribe`; this will give you a subscription interface
|
||||
// from which new messages can be pumped.
|
|
@ -14,7 +14,7 @@ const (
|
|||
FloodSubID = protocol.ID("/floodsub/1.0.0")
|
||||
)
|
||||
|
||||
// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps
|
||||
// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
|
||||
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) {
|
||||
rt := &FloodSubRouter{
|
||||
protocols: ps,
|
||||
|
@ -22,7 +22,7 @@ func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID
|
|||
return NewPubSub(ctx, h, rt, opts...)
|
||||
}
|
||||
|
||||
// NewFloodSub returns a new PubSub object using the FloodSubRouter
|
||||
// NewFloodSub returns a new PubSub object using the FloodSubRouter.
|
||||
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
||||
return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...)
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ var (
|
|||
GossipSubFanoutTTL = 60 * time.Second
|
||||
)
|
||||
|
||||
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router
|
||||
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
|
||||
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
||||
rt := &GossipSubRouter{
|
||||
peers: make(map[peer.ID]protocol.ID),
|
||||
|
|
25
pubsub.go
25
pubsub.go
|
@ -27,6 +27,7 @@ const (
|
|||
|
||||
var log = logging.Logger("pubsub")
|
||||
|
||||
// PubSub is the implementation of the pubsub system.
|
||||
type PubSub struct {
|
||||
// atomic counter for seqnos
|
||||
// NOTE: Must be declared at the top of the struct as we perform atomic
|
||||
|
@ -106,7 +107,7 @@ type PubSub struct {
|
|||
ctx context.Context
|
||||
}
|
||||
|
||||
// PubSubRouter is the message router component of PubSub
|
||||
// PubSubRouter is the message router component of PubSub.
|
||||
type PubSubRouter interface {
|
||||
// Protocols returns the list of protocols supported by the router.
|
||||
Protocols() []protocol.ID
|
||||
|
@ -147,7 +148,7 @@ type RPC struct {
|
|||
|
||||
type Option func(*PubSub) error
|
||||
|
||||
// NewPubSub returns a new PubSub management object
|
||||
// NewPubSub returns a new PubSub management object.
|
||||
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
|
||||
ps := &PubSub{
|
||||
host: h,
|
||||
|
@ -758,14 +759,14 @@ type topicReq struct {
|
|||
resp chan []string
|
||||
}
|
||||
|
||||
// GetTopics returns the topics this node is subscribed to
|
||||
// GetTopics returns the topics this node is subscribed to.
|
||||
func (p *PubSub) GetTopics() []string {
|
||||
out := make(chan []string, 1)
|
||||
p.getTopics <- &topicReq{resp: out}
|
||||
return <-out
|
||||
}
|
||||
|
||||
// Publish publishes data under the given topic
|
||||
// Publish publishes data to the given topic.
|
||||
func (p *PubSub) Publish(topic string, data []byte) error {
|
||||
seqno := p.nextSeqno()
|
||||
m := &pb.Message{
|
||||
|
@ -804,7 +805,7 @@ type sendReq struct {
|
|||
msg *Message
|
||||
}
|
||||
|
||||
// ListPeers returns a list of peers we are connected to.
|
||||
// ListPeers returns a list of peers we are connected to in the given topic.
|
||||
func (p *PubSub) ListPeers(topic string) []peer.ID {
|
||||
out := make(chan []peer.ID)
|
||||
p.getPeers <- &listPeerReq{
|
||||
|
@ -835,13 +836,13 @@ type topicVal struct {
|
|||
validateThrottle chan struct{}
|
||||
}
|
||||
|
||||
// Validator is a function that validates a message
|
||||
// Validator is a function that validates a message.
|
||||
type Validator func(context.Context, *Message) bool
|
||||
|
||||
// ValidatorOpt is an option for RegisterTopicValidator
|
||||
// ValidatorOpt is an option for RegisterTopicValidator.
|
||||
type ValidatorOpt func(addVal *addValReq) error
|
||||
|
||||
// WithValidatorTimeout is an option that sets the topic validator timeout
|
||||
// WithValidatorTimeout is an option that sets the topic validator timeout.
|
||||
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
|
||||
return func(addVal *addValReq) error {
|
||||
addVal.timeout = timeout
|
||||
|
@ -849,7 +850,7 @@ func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
|
|||
}
|
||||
}
|
||||
|
||||
// WithValidatorConcurrency is an option that sets topic validator throttle
|
||||
// WithValidatorConcurrency is an option that sets topic validator throttle.
|
||||
func WithValidatorConcurrency(n int) ValidatorOpt {
|
||||
return func(addVal *addValReq) error {
|
||||
addVal.throttle = n
|
||||
|
@ -857,7 +858,7 @@ func WithValidatorConcurrency(n int) ValidatorOpt {
|
|||
}
|
||||
}
|
||||
|
||||
// RegisterTopicValidator registers a validator for topic
|
||||
// RegisterTopicValidator registers a validator for topic.
|
||||
func (p *PubSub) RegisterTopicValidator(topic string, val Validator, opts ...ValidatorOpt) error {
|
||||
addVal := &addValReq{
|
||||
topic: topic,
|
||||
|
@ -904,8 +905,8 @@ func (ps *PubSub) addValidator(req *addValReq) {
|
|||
req.resp <- nil
|
||||
}
|
||||
|
||||
// UnregisterTopicValidator removes a validator from a topic
|
||||
// returns an error if there was no validator registered with the topic
|
||||
// UnregisterTopicValidator removes a validator from a topic.
|
||||
// Returns an error if there was no validator registered with the topic.
|
||||
func (p *PubSub) UnregisterTopicValidator(topic string) error {
|
||||
rmVal := &rmValReq{
|
||||
topic: topic,
|
||||
|
|
|
@ -18,7 +18,7 @@ var (
|
|||
RandomSubD = 6
|
||||
)
|
||||
|
||||
// NewRandomSub returns a new PubSub object using RandomSubRouter as the router
|
||||
// NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
|
||||
func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
|
||||
rt := &RandomSubRouter{
|
||||
peers: make(map[peer.ID]protocol.ID),
|
||||
|
|
Loading…
Reference in New Issue