Support wildcard subscriptions + getter for known types (optimized) (#40)

Co-authored-by: Raúl Kripalani <raul@protocol.ai>
This commit is contained in:
Raúl Kripalani 2020-05-20 18:51:44 +01:00 committed by GitHub
parent b649492774
commit 72974d36de
2 changed files with 231 additions and 10 deletions

View File

@ -15,14 +15,16 @@ import (
// basicBus is a type-based event delivery system
type basicBus struct {
lk sync.Mutex
nodes map[reflect.Type]*node
lk sync.RWMutex
nodes map[reflect.Type]*node
wildcard *wildcardNode
}
var _ event.Bus = (*basicBus)(nil)
type emitter struct {
n *node
w *wildcardNode
typ reflect.Type
closed int32
dropper func(reflect.Type)
@ -33,6 +35,8 @@ func (e *emitter) Emit(evt interface{}) error {
return fmt.Errorf("emitter is closed")
}
e.n.emit(evt)
e.w.emit(evt)
return nil
}
@ -48,7 +52,8 @@ func (e *emitter) Close() error {
func NewBus() event.Bus {
return &basicBus{
nodes: map[reflect.Type]*node{},
nodes: map[reflect.Type]*node{},
wildcard: new(wildcardNode),
}
}
@ -96,6 +101,20 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
b.lk.Unlock()
}
type wildcardSub struct {
ch chan interface{}
w *wildcardNode
}
func (w *wildcardSub) Out() <-chan interface{} {
return w.ch
}
func (w *wildcardSub) Close() error {
w.w.removeSink(w.ch)
return nil
}
type sub struct {
ch chan interface{}
nodes []*node
@ -143,18 +162,35 @@ var _ event.Subscription = (*sub)(nil)
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) {
settings := subSettings(subSettingsDefault)
settings := subSettingsDefault
for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
}
}
if evtTypes == event.WildcardSubscription {
out := &wildcardSub{
ch: make(chan interface{}, settings.buffer),
w: b.wildcard,
}
b.wildcard.addSink(out.ch)
return out, nil
}
types, ok := evtTypes.([]interface{})
if !ok {
types = []interface{}{evtTypes}
}
if len(types) > 1 {
for _, t := range types {
if t == event.WildcardSubscription {
return nil, fmt.Errorf("wildcard subscriptions must be started separately")
}
}
}
out := &sub{
ch: make(chan interface{}, settings.buffer),
nodes: make([]*node, len(types)),
@ -199,8 +235,11 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
//
// emit(EventT{})
func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e event.Emitter, err error) {
var settings emitterSettings
if evtType == event.WildcardSubscription {
return nil, fmt.Errorf("illegal emitter for wildcard subscription")
}
var settings emitterSettings
for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
@ -216,14 +255,65 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.keepLast = n.keepLast || settings.makeStateful
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode, w: b.wildcard}
}, nil)
return
}
// GetAllEventTypes returns all the event types that this bus has emitters
// or subscribers for.
func (b *basicBus) GetAllEventTypes() []reflect.Type {
b.lk.RLock()
defer b.lk.RUnlock()
types := make([]reflect.Type, 0, len(b.nodes))
for t, _ := range b.nodes {
types = append(types, t)
}
return types
}
///////////////////////
// NODE
type wildcardNode struct {
sync.RWMutex
nSinks int32
sinks []chan interface{}
}
func (n *wildcardNode) addSink(ch chan interface{}) {
atomic.AddInt32(&n.nSinks, 1) // ok to do outside the lock
n.Lock()
n.sinks = append(n.sinks, ch)
n.Unlock()
}
func (n *wildcardNode) removeSink(ch chan interface{}) {
atomic.AddInt32(&n.nSinks, -1) // ok to do outside the lock
n.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]
break
}
}
n.Unlock()
}
func (n *wildcardNode) emit(evt interface{}) {
if atomic.LoadInt32(&n.nSinks) == 0 {
return
}
n.RLock()
for _, ch := range n.sinks {
ch <- evt
}
n.RUnlock()
}
type node struct {
// Note: make sure to NEVER lock basicBus.lk when this lock is held
lk sync.Mutex
@ -245,19 +335,19 @@ func newNode(typ reflect.Type) *node {
}
}
func (n *node) emit(event interface{}) {
typ := reflect.TypeOf(event)
func (n *node) emit(evt interface{}) {
typ := reflect.TypeOf(evt)
if typ != n.typ {
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, typ))
}
n.lk.Lock()
if n.keepLast {
n.last = event
n.last = evt
}
for _, ch := range n.sinks {
ch <- event
ch <- evt
}
n.lk.Unlock()
}

View File

@ -1,13 +1,18 @@
package eventbus
import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-testing/race"
"github.com/stretchr/testify/require"
)
type EventA struct{}
@ -89,6 +94,30 @@ func TestSub(t *testing.T) {
}
}
func TestGetAllEventTypes(t *testing.T) {
bus := NewBus()
require.Empty(t, bus.GetAllEventTypes())
// the wildcard subscription should be returned.
_, err := bus.Subscribe(event.WildcardSubscription)
require.NoError(t, err)
_, err = bus.Subscribe(new(EventB))
require.NoError(t, err)
evts := bus.GetAllEventTypes()
require.Len(t, evts, 1)
require.Equal(t, reflect.TypeOf((*EventB)(nil)).Elem(), evts[0])
_, err = bus.Emitter(new(EventA))
require.NoError(t, err)
evts = bus.GetAllEventTypes()
require.Len(t, evts, 2)
require.Contains(t, evts, reflect.TypeOf((*EventB)(nil)).Elem())
require.Contains(t, evts, reflect.TypeOf((*EventA)(nil)).Elem())
}
func TestEmitNoSubNoBlock(t *testing.T) {
bus := NewBus()
@ -206,6 +235,108 @@ func TestSubMany(t *testing.T) {
}
}
func TestWildcardSubscription(t *testing.T) {
bus := NewBus()
sub, err := bus.Subscribe(event.WildcardSubscription)
require.NoError(t, err)
defer sub.Close()
em1, err := bus.Emitter(new(EventA))
require.NoError(t, err)
defer em1.Close()
em2, err := bus.Emitter(new(EventB))
require.NoError(t, err)
defer em2.Close()
require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var evts []interface{}
LOOP:
for {
select {
case evt := <-sub.Out():
if evta, ok := evt.(EventA); ok {
evts = append(evts, evta)
}
if evtb, ok := evt.(EventB); ok {
evts = append(evts, evtb)
}
if len(evts) == 2 {
break LOOP
}
case <-ctx.Done():
t.Fatalf("did not receive events")
}
}
}
func TestManyWildcardSubscriptions(t *testing.T) {
bus := NewBus()
var subs []event.Subscription
for i := 0; i < 10; i++ {
sub, err := bus.Subscribe(event.WildcardSubscription)
require.NoError(t, err)
subs = append(subs, sub)
}
em1, err := bus.Emitter(new(EventA))
require.NoError(t, err)
defer em1.Close()
em2, err := bus.Emitter(new(EventB))
require.NoError(t, err)
defer em2.Close()
require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))
// all 10 subscriptions received all 2 events.
for _, s := range subs {
require.Len(t, s.Out(), 2)
}
// close the first five subscriptions.
for _, s := range subs[:5] {
require.NoError(t, s.Close())
}
// emit another 2 events.
require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))
// the first five still have 2 events, while the other five have 4 events.
for _, s := range subs[:5] {
require.Len(t, s.Out(), 2)
}
for _, s := range subs[5:] {
require.Len(t, s.Out(), 4)
}
// close them all, the first five will be closed twice (asserts idempotency).
for _, s := range subs {
require.NoError(t, s.Close())
}
}
func TestWildcardValidations(t *testing.T) {
bus := NewBus()
_, err := bus.Subscribe([]interface{}{event.WildcardSubscription, new(EventA), new(EventB)})
require.Error(t, err)
_, err = bus.Emitter(event.WildcardSubscription)
require.Error(t, err)
}
func TestSubType(t *testing.T) {
bus := NewBus()
sub, err := bus.Subscribe([]interface{}{new(EventA), new(EventB)})