make Emitter an interface.

This commit is contained in:
Raúl Kripalani 2019-06-19 16:09:42 +01:00
parent ee0cf463ce
commit b9fe91677a
3 changed files with 69 additions and 67 deletions

View File

@ -6,18 +6,48 @@ import (
"reflect"
"sync"
"sync/atomic"
"github.com/libp2p/go-libp2p-core/event"
)
///////////////////////
// BUS
type CancelFunc = func()
// basicBus is a type-based event delivery system
type basicBus struct {
lk sync.Mutex
nodes map[reflect.Type]*node
}
func NewBus() *basicBus {
var _ event.Bus = (*basicBus)(nil)
type Emitter struct {
n *node
typ reflect.Type
closed int32
dropper func(reflect.Type)
}
func (e *Emitter) Emit(evt interface{}) {
if atomic.LoadInt32(&e.closed) != 0 {
panic("emitter is closed")
}
e.n.emit(evt)
}
func (e *Emitter) Close() error {
if !atomic.CompareAndSwapInt32(&e.closed, 0, 1) {
panic("closed an emitter more than once")
}
if atomic.AddInt32(&e.n.nEmitters, -1) == 0 {
e.dropper(e.typ)
}
return nil
}
func NewBus() event.Bus {
return &basicBus{
nodes: map[reflect.Type]*node{},
}
@ -74,7 +104,7 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubscriptionOpt) (c CancelFunc, err error) {
var settings subSettings
for _, opt := range opts {
if err := opt(&settings); err != nil {
@ -138,10 +168,12 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c Cancel
// defer emit.Close() // MUST call this after being done with the emitter
//
// emit(EventT{})
func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) {
func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOpt) (e event.Emitter, err error) {
var settings emitterSettings
for _, opt := range opts {
opt(&settings)
if err := opt(&settings); err != nil {
return nil, err
}
}
typ := reflect.TypeOf(evtType)
@ -152,22 +184,8 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFu
err = b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
closed := false
n.keepLast = n.keepLast || settings.makeStateful
e = func(event interface{}) {
if closed {
panic("emitter is closed")
}
if event == closeEmit {
closed = true
if atomic.AddInt32(&n.nEmitters, -1) == 0 {
b.tryDropNode(typ)
}
return
}
n.emit(event)
}
e = &Emitter{n: n, typ: typ, dropper: b.tryDropNode}
}, func(_ *node) {})
return
}
@ -212,20 +230,3 @@ func (n *node) emit(event interface{}) {
}
n.lk.RUnlock()
}
///////////////////////
// TYPES
var closeEmit struct{}
// EmitFunc emits events. If any channel subscribed to the topic is blocked,
// calls to EmitFunc will block
//
// Calling this function with wrong event type will cause a panic
type EmitFunc func(event interface{})
func (f EmitFunc) Close() {
f(closeEmit)
}
type CancelFunc func()

View File

@ -38,13 +38,13 @@ func TestEmit(t *testing.T) {
<-events
}()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventA{})
em.Emit(EventA{})
}
func TestSub(t *testing.T) {
@ -66,13 +66,13 @@ func TestSub(t *testing.T) {
wait.Done()
}()
emit, err := bus.Emitter(new(EventB))
em, err := bus.Emitter(new(EventB))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventB(7))
em.Emit(EventB(7))
wait.Wait()
if event != 7 {
@ -83,23 +83,23 @@ func TestSub(t *testing.T) {
func TestEmitNoSubNoBlock(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventA{})
em.Emit(EventA{})
}
func TestEmitOnClosed(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
emit.Close()
em.Close()
defer func() {
r := recover()
@ -111,7 +111,7 @@ func TestEmitOnClosed(t *testing.T) {
}
}()
emit(EventA{})
em.Emit(EventA{})
}
func TestClosingRaces(t *testing.T) {
@ -187,15 +187,15 @@ func TestSubMany(t *testing.T) {
}()
}
emit, err := bus.Emitter(new(EventB))
em, err := bus.Emitter(new(EventB))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
ready.Wait()
emit(EventB(7))
em.Emit(EventB(7))
wait.Wait()
if int(r) != 7*n {
@ -222,13 +222,13 @@ func TestSubType(t *testing.T) {
wait.Done()
}()
emit, err := bus.Emitter(new(EventA))
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventA{})
em.Emit(EventA{})
wait.Wait()
if event.String() != "Oh, Hello" {
@ -238,11 +238,11 @@ func TestSubType(t *testing.T) {
func TestNonStateful(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventB))
em, err := bus.Emitter(new(EventB))
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
@ -257,7 +257,7 @@ func TestNonStateful(t *testing.T) {
default:
}
emit(EventB(1))
em.Emit(EventB(1))
select {
case e := <-eventsA:
@ -284,13 +284,13 @@ func TestNonStateful(t *testing.T) {
func TestStateful(t *testing.T) {
bus := NewBus()
emit, err := bus.Emitter(new(EventB), Stateful)
em, err := bus.Emitter(new(EventB), Stateful)
if err != nil {
t.Fatal(err)
}
defer emit.Close()
defer em.Close()
emit(EventB(2))
em.Emit(EventB(2))
eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
@ -337,19 +337,19 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
for i := 0; i < emits; i++ {
go func() {
emit, err := bus.Emitter(new(EventB), func(settings interface{}) error {
em, err := bus.Emitter(new(EventB), func(settings interface{}) error {
settings.(*emitterSettings).makeStateful = stateful
return nil
})
if err != nil {
panic(err)
}
defer emit.Close()
defer em.Close()
ready.Wait()
for i := 0; i < msgs; i++ {
emit(EventB(97))
em.Emit(EventB(97))
}
wait.Done()

View File

@ -5,12 +5,14 @@ import (
"reflect"
)
type SubscriptionOpt = func(interface{}) error
type EmitterOpt = func(interface{}) error
type subSettings struct {
forcedType reflect.Type
}
type SubOption func(interface{}) error
// ForceSubType is a Subscribe option which overrides the type to which
// the subscription will be done. Note that the evtType must be assignable
// to channel type.
@ -27,7 +29,7 @@ type SubOption func(interface{}) error
// eventCh := make(chan fmt.Stringer) // interface { String() string }
// cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event)))
// [...]
func ForceSubType(evtType interface{}) SubOption {
func ForceSubType(evtType interface{}) SubscriptionOpt {
return func(settings interface{}) error {
s := settings.(*subSettings)
typ := reflect.TypeOf(evtType)
@ -42,7 +44,6 @@ func ForceSubType(evtType interface{}) SubOption {
type emitterSettings struct {
makeStateful bool
}
type EmitterOption func(interface{}) error
// Stateful is an Emitter option which makes makes the eventbus channel
// 'remember' last event sent, and when a new subscriber joins the