go-libp2p-pubsub/topic.go
Daniel Martí 5c90105738 support MinTopicSize without a discovery mechanism
That is, when MinTopicSize is used but not WithDiscovery,
Publish will keep waiting until MinTopicSize's condition is met.

At the moment, this is done by polling every 200ms.
In the future, the mechanism could be optimized to be event-based.
A TODO is left for that purpose.

Fixes #454.
2021-10-29 20:55:01 +03:00

429 lines
8.9 KiB
Go

package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-core/peer"
)
// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")
// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
topic string
evtHandlerMux sync.RWMutex
evtHandlers map[*TopicEventHandler]struct{}
mux sync.RWMutex
closed bool
}
// String returns the topic associated with t
func (t *Topic) String() string {
return t.topic
}
// SetScoreParams sets the topic score parameters if the pubsub router supports peer
// scoring
func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
err := p.validate()
if err != nil {
return fmt.Errorf("invalid topic score parameters: %w", err)
}
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
return ErrTopicClosed
}
result := make(chan error, 1)
update := func() {
gs, ok := t.p.rt.(*GossipSubRouter)
if !ok {
result <- fmt.Errorf("pubsub router is not gossipsub")
return
}
if gs.score == nil {
result <- fmt.Errorf("peer scoring is not enabled in router")
return
}
err := gs.score.SetTopicScoreParams(t.topic, p)
result <- err
}
select {
case t.p.eval <- update:
err = <-result
return err
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
}
// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}
h := &TopicEventHandler{
topic: t,
err: nil,
evtLog: make(map[peer.ID]EventType),
evtLogCh: make(chan struct{}, 1),
}
for _, opt := range opts {
err := opt(h)
if err != nil {
return nil, err
}
}
done := make(chan struct{}, 1)
select {
case t.p.eval <- func() {
tmap := t.p.topics[t.topic]
for p := range tmap {
h.evtLog[p] = PeerJoin
}
t.evtHandlerMux.Lock()
t.evtHandlers[h] = struct{}{}
t.evtHandlerMux.Unlock()
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}
<-done
return h, nil
}
func (t *Topic) sendNotification(evt PeerEvent) {
t.evtHandlerMux.RLock()
defer t.evtHandlerMux.RUnlock()
for h := range t.evtHandlers {
h.sendNotification(evt)
}
}
// Subscribe returns a new Subscription for the topic.
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}
sub := &Subscription{
topic: t.topic,
ctx: t.p.ctx,
}
for _, opt := range opts {
err := opt(sub)
if err != nil {
return nil, err
}
}
if sub.ch == nil {
// apply the default size
sub.ch = make(chan *Message, 32)
}
out := make(chan *Subscription, 1)
t.p.disc.Discover(sub.topic)
select {
case t.p.addSub <- &addSubReq{
sub: sub,
resp: out,
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}
return <-out, nil
}
// Relay enables message relaying for the topic and returns a reference
// cancel function. Subsequent calls increase the reference counter.
// To completely disable the relay, all references must be cancelled.
func (t *Topic) Relay() (RelayCancelFunc, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}
out := make(chan RelayCancelFunc, 1)
t.p.disc.Discover(t.topic)
select {
case t.p.addRelay <- &addRelayReq{
topic: t.topic,
resp: out,
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}
return <-out, nil
}
// RouterReady is a function that decides if a router is ready to publish
type RouterReady func(rt PubSubRouter, topic string) (bool, error)
type PublishOptions struct {
ready RouterReady
}
type PubOpt func(pub *PublishOptions) error
// Publish publishes data to topic.
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return ErrTopicClosed
}
m := &pb.Message{
Data: data,
Topic: &t.topic,
From: nil,
Seqno: nil,
}
if t.p.signID != "" {
m.From = []byte(t.p.signID)
m.Seqno = t.p.nextSeqno()
}
if t.p.signKey != nil {
m.From = []byte(t.p.signID)
err := signMessage(t.p.signID, t.p.signKey, m)
if err != nil {
return err
}
}
pub := &PublishOptions{}
for _, opt := range opts {
err := opt(pub)
if err != nil {
return err
}
}
if pub.ready != nil {
if t.p.disc.discovery != nil {
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
} else {
// TODO: we could likely do better than polling every 200ms.
// For example, block this goroutine on a channel,
// and check again whenever events tell us that the number of
// peers has increased.
var ticker *time.Ticker
readyLoop:
for {
// Check if ready for publishing.
// Similar to what disc.Bootstrap does.
res := make(chan bool, 1)
select {
case t.p.eval <- func() {
done, _ := pub.ready(t.p.rt, t.topic)
res <- done
}:
if <-res {
break readyLoop
}
case <-t.p.ctx.Done():
return t.p.ctx.Err()
case <-ctx.Done():
return ctx.Err()
}
if ticker == nil {
ticker = time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
}
select {
case <-ticker.C:
case <-ctx.Done():
return fmt.Errorf("router is not ready: %w", ctx.Err())
}
}
}
}
return t.p.val.PushLocal(&Message{m, t.p.host.ID(), nil})
}
// WithReadiness returns a publishing option for only publishing when the router is ready.
// This option is not useful unless PubSub is also using WithDiscovery
func WithReadiness(ready RouterReady) PubOpt {
return func(pub *PublishOptions) error {
pub.ready = ready
return nil
}
}
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
return nil
}
req := &rmTopicReq{t, make(chan error, 1)}
select {
case t.p.rmTopic <- req:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
err := <-req.resp
if err == nil {
t.closed = true
}
return err
}
// ListPeers returns a list of peers we are connected to in the given topic.
func (t *Topic) ListPeers() []peer.ID {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return []peer.ID{}
}
return t.p.ListPeers(t.topic)
}
type EventType int
const (
PeerJoin EventType = iota
PeerLeave
)
// TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.
type TopicEventHandler struct {
topic *Topic
err error
evtLogMx sync.Mutex
evtLog map[peer.ID]EventType
evtLogCh chan struct{}
}
type TopicEventHandlerOpt func(t *TopicEventHandler) error
type PeerEvent struct {
Type EventType
Peer peer.ID
}
// Cancel closes the topic event handler
func (t *TopicEventHandler) Cancel() {
topic := t.topic
t.err = fmt.Errorf("topic event handler cancelled by calling handler.Cancel()")
topic.evtHandlerMux.Lock()
delete(topic.evtHandlers, t)
t.topic.evtHandlerMux.Unlock()
}
func (t *TopicEventHandler) sendNotification(evt PeerEvent) {
t.evtLogMx.Lock()
t.addToEventLog(evt)
t.evtLogMx.Unlock()
}
// addToEventLog assumes a lock has been taken to protect the event log
func (t *TopicEventHandler) addToEventLog(evt PeerEvent) {
e, ok := t.evtLog[evt.Peer]
if !ok {
t.evtLog[evt.Peer] = evt.Type
// send signal that an event has been added to the event log
select {
case t.evtLogCh <- struct{}{}:
default:
}
} else if e != evt.Type {
delete(t.evtLog, evt.Peer)
}
}
// pullFromEventLog assumes a lock has been taken to protect the event log
func (t *TopicEventHandler) pullFromEventLog() (PeerEvent, bool) {
for k, v := range t.evtLog {
evt := PeerEvent{Peer: k, Type: v}
delete(t.evtLog, k)
return evt, true
}
return PeerEvent{}, false
}
// NextPeerEvent returns the next event regarding subscribed peers
// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order.
// Unless a peer both Joins and Leaves before NextPeerEvent emits either event
// all events will eventually be received from NextPeerEvent.
func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
for {
t.evtLogMx.Lock()
evt, ok := t.pullFromEventLog()
if ok {
// make sure an event log signal is available if there are events in the event log
if len(t.evtLog) > 0 {
select {
case t.evtLogCh <- struct{}{}:
default:
}
}
t.evtLogMx.Unlock()
return evt, nil
}
t.evtLogMx.Unlock()
select {
case <-t.evtLogCh:
continue
case <-ctx.Done():
return PeerEvent{}, ctx.Err()
}
}
}