mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-07 23:33:08 +00:00
fixed closed Topic handles still being able to perform some actions on the topic
This commit is contained in:
parent
899f9cd62b
commit
ad97d9bf17
@ -108,6 +108,21 @@ func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...
|
||||
return d.server.FindPeers(ns, options.Limit)
|
||||
}
|
||||
|
||||
type dummyDiscovery struct{}
|
||||
|
||||
func (d *dummyDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
|
||||
return time.Hour, nil
|
||||
}
|
||||
|
||||
func (d *dummyDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
|
||||
retCh := make(chan peer.AddrInfo)
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
close(retCh)
|
||||
}()
|
||||
return retCh, nil
|
||||
}
|
||||
|
||||
func TestSimpleDiscovery(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
45
topic.go
45
topic.go
@ -2,6 +2,7 @@ package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
@ -10,6 +11,9 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
// ErrTopicClosed is returned if a Topic is utilized after it has been closed
|
||||
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")
|
||||
|
||||
// Topic is the handle for a pubsub topic
|
||||
type Topic struct {
|
||||
p *PubSub
|
||||
@ -17,11 +21,20 @@ type Topic struct {
|
||||
|
||||
evtHandlerMux sync.RWMutex
|
||||
evtHandlers map[*TopicEventHandler]struct{}
|
||||
|
||||
mux sync.RWMutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// EventHandler creates a handle for topic specific events
|
||||
// Multiple event handlers may be created and will operate independently of each other
|
||||
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return nil, ErrTopicClosed
|
||||
}
|
||||
|
||||
h := &TopicEventHandler{
|
||||
topic: t,
|
||||
err: nil,
|
||||
@ -68,6 +81,12 @@ func (t *Topic) sendNotification(evt PeerEvent) {
|
||||
// Note that subscription is not an instanteneous operation. It may take some time
|
||||
// before the subscription is processed by the pubsub main loop and propagated to our peers.
|
||||
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return nil, ErrTopicClosed
|
||||
}
|
||||
|
||||
sub := &Subscription{
|
||||
topic: t.topic,
|
||||
ch: make(chan *Message, 32),
|
||||
@ -104,6 +123,12 @@ type PubOpt func(pub *PublishOptions) error
|
||||
|
||||
// Publish publishes data to topic.
|
||||
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return ErrTopicClosed
|
||||
}
|
||||
|
||||
seqno := t.p.nextSeqno()
|
||||
id := t.p.host.ID()
|
||||
m := &pb.Message{
|
||||
@ -149,13 +174,31 @@ func WithReadiness(ready RouterReady) PubOpt {
|
||||
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
|
||||
// Does not error if the topic is already closed.
|
||||
func (t *Topic) Close() error {
|
||||
t.mux.Lock()
|
||||
defer t.mux.Unlock()
|
||||
if t.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
req := &rmTopicReq{t, make(chan error, 1)}
|
||||
t.p.rmTopic <- req
|
||||
return <-req.resp
|
||||
err := <-req.resp
|
||||
|
||||
if err == nil {
|
||||
t.closed = true
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ListPeers returns a list of peers we are connected to in the given topic.
|
||||
func (t *Topic) ListPeers() []peer.ID {
|
||||
t.mux.RLock()
|
||||
defer t.mux.RUnlock()
|
||||
if t.closed {
|
||||
return []peer.ID{}
|
||||
}
|
||||
|
||||
return t.p.ListPeers(t.topic)
|
||||
}
|
||||
|
||||
|
||||
109
topic_test.go
109
topic_test.go
@ -1,6 +1,7 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
@ -42,13 +43,13 @@ func TestTopicCloseWithOpenSubscription(t *testing.T) {
|
||||
var sub *Subscription
|
||||
var err error
|
||||
testTopicCloseWithOpenResource(t,
|
||||
func (topic *Topic) {
|
||||
sub , err = topic.Subscribe()
|
||||
func(topic *Topic) {
|
||||
sub, err = topic.Subscribe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
},
|
||||
func (){
|
||||
func() {
|
||||
sub.Cancel()
|
||||
},
|
||||
)
|
||||
@ -58,13 +59,13 @@ func TestTopicCloseWithOpenEventHandler(t *testing.T) {
|
||||
var evts *TopicEventHandler
|
||||
var err error
|
||||
testTopicCloseWithOpenResource(t,
|
||||
func (topic *Topic) {
|
||||
evts , err = topic.EventHandler()
|
||||
func(topic *Topic) {
|
||||
evts, err = topic.EventHandler()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
},
|
||||
func (){
|
||||
func() {
|
||||
evts.Cancel()
|
||||
},
|
||||
)
|
||||
@ -110,6 +111,100 @@ func testTopicCloseWithOpenResource(t *testing.T, openResource func(topic *Topic
|
||||
}
|
||||
}
|
||||
|
||||
func TestTopicReuse(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
const numHosts = 2
|
||||
topicID := "foobar"
|
||||
hosts := getNetHosts(t, ctx, numHosts)
|
||||
|
||||
sender := getPubsub(ctx, hosts[0], WithDiscovery(&dummyDiscovery{}))
|
||||
receiver := getPubsub(ctx, hosts[1])
|
||||
|
||||
connectAll(t, hosts)
|
||||
|
||||
// Sender creates topic
|
||||
sendTopic, err := sender.Join(topicID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Receiver creates and subscribes to the topic
|
||||
receiveTopic, err := receiver.Join(topicID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sub, err := receiveTopic.Subscribe()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
firstMsg := []byte("1")
|
||||
if err := sendTopic.Publish(ctx, firstMsg, WithReadiness(MinTopicSize(1))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
msg, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if bytes.Compare(msg.GetData(), firstMsg) != 0 {
|
||||
t.Fatal("received incorrect message")
|
||||
}
|
||||
|
||||
if err := sendTopic.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Recreate the same topic
|
||||
newSendTopic, err := sender.Join(topicID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Try sending data with original topic
|
||||
illegalSend := []byte("illegal")
|
||||
if err := sendTopic.Publish(ctx, illegalSend); err != ErrTopicClosed {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
|
||||
defer timeoutCancel()
|
||||
msg, err = sub.Next(timeoutCtx)
|
||||
if err != context.DeadlineExceeded {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if bytes.Compare(msg.GetData(), illegalSend) != 0 {
|
||||
t.Fatal("received incorrect message from illegal topic")
|
||||
}
|
||||
t.Fatal("received message sent by illegal topic")
|
||||
}
|
||||
timeoutCancel()
|
||||
|
||||
// Try cancelling the new topic by using the original topic
|
||||
if err := sendTopic.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
secondMsg := []byte("2")
|
||||
if err := newSendTopic.Publish(ctx, secondMsg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
timeoutCtx, timeoutCancel = context.WithTimeout(ctx, time.Second*2)
|
||||
defer timeoutCancel()
|
||||
msg, err = sub.Next(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if bytes.Compare(msg.GetData(), secondMsg) != 0 {
|
||||
t.Fatal("received incorrect message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTopicEventHandlerCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@ -130,7 +225,7 @@ func TestTopicEventHandlerCancel(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
evts.Cancel()
|
||||
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second * 2)
|
||||
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
|
||||
defer timeoutCancel()
|
||||
connectAll(t, hosts)
|
||||
_, err = evts.NextPeerEvent(timeoutCtx)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user