diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index 9f075aaf..ae7509eb 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -6,18 +6,48 @@ import ( "reflect" "sync" "sync/atomic" + + "github.com/libp2p/go-libp2p-core/event" ) /////////////////////// // BUS +type CancelFunc = func() + // basicBus is a type-based event delivery system type basicBus struct { lk sync.Mutex nodes map[reflect.Type]*node } -func NewBus() *basicBus { +var _ event.Bus = (*basicBus)(nil) + +type Emitter struct { + n *node + typ reflect.Type + closed int32 + dropper func(reflect.Type) +} + +func (e *Emitter) Emit(evt interface{}) { + if atomic.LoadInt32(&e.closed) != 0 { + panic("emitter is closed") + } + e.n.emit(evt) +} + +func (e *Emitter) Close() error { + if !atomic.CompareAndSwapInt32(&e.closed, 0, 1) { + panic("closed an emitter more than once") + } + if atomic.AddInt32(&e.n.nEmitters, -1) == 0 { + e.dropper(e.typ) + } + return nil +} + +func NewBus() event.Bus { return &basicBus{ nodes: map[reflect.Type]*node{}, } @@ -74,7 +104,7 @@ func (b *basicBus) tryDropNode(typ reflect.Type) { // defer close(ch) // cancel, err := eventbus.Subscribe(ch) // defer cancel() -func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) { +func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubscriptionOpt) (c CancelFunc, err error) { var settings subSettings for _, opt := range opts { if err := opt(&settings); err != nil { @@ -138,10 +168,12 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c Cancel // defer emit.Close() // MUST call this after being done with the emitter // // emit(EventT{}) -func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) { +func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOpt) (e event.Emitter, err error) { var settings emitterSettings for _, opt := range opts { - opt(&settings) + if err := opt(&settings); err != nil { + return nil, err + } } typ := reflect.TypeOf(evtType) @@ -152,22 +184,8 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFu err = b.withNode(typ, func(n *node) { atomic.AddInt32(&n.nEmitters, 1) - closed := false n.keepLast = n.keepLast || settings.makeStateful - - e = func(event interface{}) { - if closed { - panic("emitter is closed") - } - if event == closeEmit { - closed = true - if atomic.AddInt32(&n.nEmitters, -1) == 0 { - b.tryDropNode(typ) - } - return - } - n.emit(event) - } + e = &Emitter{n: n, typ: typ, dropper: b.tryDropNode} }, func(_ *node) {}) return } @@ -212,20 +230,3 @@ func (n *node) emit(event interface{}) { } n.lk.RUnlock() } - -/////////////////////// -// TYPES - -var closeEmit struct{} - -// EmitFunc emits events. If any channel subscribed to the topic is blocked, -// calls to EmitFunc will block -// -// Calling this function with wrong event type will cause a panic -type EmitFunc func(event interface{}) - -func (f EmitFunc) Close() { - f(closeEmit) -} - -type CancelFunc func() diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index e088b6d2..d9d599c4 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -38,13 +38,13 @@ func TestEmit(t *testing.T) { <-events }() - emit, err := bus.Emitter(new(EventA)) + em, err := bus.Emitter(new(EventA)) if err != nil { t.Fatal(err) } - defer emit.Close() + defer em.Close() - emit(EventA{}) + em.Emit(EventA{}) } func TestSub(t *testing.T) { @@ -66,13 +66,13 @@ func TestSub(t *testing.T) { wait.Done() }() - emit, err := bus.Emitter(new(EventB)) + em, err := bus.Emitter(new(EventB)) if err != nil { t.Fatal(err) } - defer emit.Close() + defer em.Close() - emit(EventB(7)) + em.Emit(EventB(7)) wait.Wait() if event != 7 { @@ -83,23 +83,23 @@ func TestSub(t *testing.T) { func TestEmitNoSubNoBlock(t *testing.T) { bus := NewBus() - emit, err := bus.Emitter(new(EventA)) + em, err := bus.Emitter(new(EventA)) if err != nil { t.Fatal(err) } - defer emit.Close() + defer em.Close() - emit(EventA{}) + em.Emit(EventA{}) } func TestEmitOnClosed(t *testing.T) { bus := NewBus() - emit, err := bus.Emitter(new(EventA)) + em, err := bus.Emitter(new(EventA)) if err != nil { t.Fatal(err) } - emit.Close() + em.Close() defer func() { r := recover() @@ -111,7 +111,7 @@ func TestEmitOnClosed(t *testing.T) { } }() - emit(EventA{}) + em.Emit(EventA{}) } func TestClosingRaces(t *testing.T) { @@ -187,15 +187,15 @@ func TestSubMany(t *testing.T) { }() } - emit, err := bus.Emitter(new(EventB)) + em, err := bus.Emitter(new(EventB)) if err != nil { t.Fatal(err) } - defer emit.Close() + defer em.Close() ready.Wait() - emit(EventB(7)) + em.Emit(EventB(7)) wait.Wait() if int(r) != 7*n { @@ -222,13 +222,13 @@ func TestSubType(t *testing.T) { wait.Done() }() - emit, err := bus.Emitter(new(EventA)) + em, err := bus.Emitter(new(EventA)) if err != nil { t.Fatal(err) } - defer emit.Close() + defer em.Close() - emit(EventA{}) + em.Emit(EventA{}) wait.Wait() if event.String() != "Oh, Hello" { @@ -238,11 +238,11 @@ func TestSubType(t *testing.T) { func TestNonStateful(t *testing.T) { bus := NewBus() - emit, err := bus.Emitter(new(EventB)) + em, err := bus.Emitter(new(EventB)) if err != nil { t.Fatal(err) } - defer emit.Close() + defer em.Close() eventsA := make(chan EventB, 1) cancelS, err := bus.Subscribe(eventsA) @@ -257,7 +257,7 @@ func TestNonStateful(t *testing.T) { default: } - emit(EventB(1)) + em.Emit(EventB(1)) select { case e := <-eventsA: @@ -284,13 +284,13 @@ func TestNonStateful(t *testing.T) { func TestStateful(t *testing.T) { bus := NewBus() - emit, err := bus.Emitter(new(EventB), Stateful) + em, err := bus.Emitter(new(EventB), Stateful) if err != nil { t.Fatal(err) } - defer emit.Close() + defer em.Close() - emit(EventB(2)) + em.Emit(EventB(2)) eventsA := make(chan EventB, 1) cancelS, err := bus.Subscribe(eventsA) @@ -337,19 +337,19 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) { for i := 0; i < emits; i++ { go func() { - emit, err := bus.Emitter(new(EventB), func(settings interface{}) error { + em, err := bus.Emitter(new(EventB), func(settings interface{}) error { settings.(*emitterSettings).makeStateful = stateful return nil }) if err != nil { panic(err) } - defer emit.Close() + defer em.Close() ready.Wait() for i := 0; i < msgs; i++ { - emit(EventB(97)) + em.Emit(EventB(97)) } wait.Done() diff --git a/p2p/host/eventbus/opts.go b/p2p/host/eventbus/opts.go index cde6b747..56739125 100644 --- a/p2p/host/eventbus/opts.go +++ b/p2p/host/eventbus/opts.go @@ -5,12 +5,14 @@ import ( "reflect" ) +type SubscriptionOpt = func(interface{}) error + +type EmitterOpt = func(interface{}) error + type subSettings struct { forcedType reflect.Type } -type SubOption func(interface{}) error - // ForceSubType is a Subscribe option which overrides the type to which // the subscription will be done. Note that the evtType must be assignable // to channel type. @@ -27,7 +29,7 @@ type SubOption func(interface{}) error // eventCh := make(chan fmt.Stringer) // interface { String() string } // cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event))) // [...] -func ForceSubType(evtType interface{}) SubOption { +func ForceSubType(evtType interface{}) SubscriptionOpt { return func(settings interface{}) error { s := settings.(*subSettings) typ := reflect.TypeOf(evtType) @@ -42,7 +44,6 @@ func ForceSubType(evtType interface{}) SubOption { type emitterSettings struct { makeStateful bool } -type EmitterOption func(interface{}) error // Stateful is an Emitter option which makes makes the eventbus channel // 'remember' last event sent, and when a new subscriber joins the