diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index ae5af168..9f075aaf 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -11,6 +11,7 @@ import ( /////////////////////// // BUS +// basicBus is a type-based event delivery system type basicBus struct { lk sync.Mutex nodes map[reflect.Type]*node @@ -64,6 +65,15 @@ func (b *basicBus) tryDropNode(typ reflect.Type) { b.lk.Unlock() } +// Subscribe creates new subscription. Failing to drain the channel will cause +// publishers to get blocked. CancelFunc is guaranteed to return after last send +// to the channel +// +// Example: +// ch := make(chan EventT, 10) +// defer close(ch) +// cancel, err := eventbus.Subscribe(ch) +// defer cancel() func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) { var settings subSettings for _, opt := range opts { @@ -118,6 +128,16 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c Cancel return } +// Emitter creates new emitter +// +// eventType accepts typed nil pointers, and uses the type information to +// select output type +// +// Example: +// emit, err := eventbus.Emitter(new(EventT)) +// defer emit.Close() // MUST call this after being done with the emitter +// +// emit(EventT{}) func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) { var settings emitterSettings for _, opt := range opts { @@ -194,6 +214,18 @@ func (n *node) emit(event interface{}) { } /////////////////////// -// UTILS +// TYPES -var _ Bus = &basicBus{} +var closeEmit struct{} + +// EmitFunc emits events. If any channel subscribed to the topic is blocked, +// calls to EmitFunc will block +// +// Calling this function with wrong event type will cause a panic +type EmitFunc func(event interface{}) + +func (f EmitFunc) Close() { + f(closeEmit) +} + +type CancelFunc func() diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index af05a5cb..e088b6d2 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -337,8 +337,9 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) { for i := 0; i < emits; i++ { go func() { - emit, err := bus.Emitter(new(EventB), func(settings *emitterSettings) { - settings.makeStateful = stateful + emit, err := bus.Emitter(new(EventB), func(settings interface{}) error { + settings.(*emitterSettings).makeStateful = stateful + return nil }) if err != nil { panic(err) diff --git a/p2p/host/eventbus/interface.go b/p2p/host/eventbus/opts.go similarity index 54% rename from p2p/host/eventbus/interface.go rename to p2p/host/eventbus/opts.go index bff5d46f..cde6b747 100644 --- a/p2p/host/eventbus/interface.go +++ b/p2p/host/eventbus/opts.go @@ -5,8 +5,6 @@ import ( "reflect" ) -var closeEmit struct{} - type subSettings struct { forcedType reflect.Type } @@ -53,44 +51,7 @@ type EmitterOption func(interface{}) error // // This allows to provide state tracking for dynamic systems, and/or // allows new subscribers to verify that there are Emitters on the channel -func Stateful(s *emitterSettings) { - s.makeStateful = true +func Stateful(s interface{}) error { + s.(*emitterSettings).makeStateful = true + return nil } - -// Bus is an interface to type-based event delivery system -type Bus interface { - // Subscribe creates new subscription. Failing to drain the channel will cause - // publishers to get blocked. CancelFunc is guaranteed to return after last send - // to the channel - // - // Example: - // ch := make(chan EventT, 10) - // defer close(ch) - // cancel, err := eventbus.Subscribe(ch) - // defer cancel() - Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error) - - // Emitter creates new emitter - // - // eventType accepts typed nil pointers, and uses the type information to - // select output type - // - // Example: - // emit, err := eventbus.Emitter(new(EventT)) - // defer emit.Close() // MUST call this after being done with the emitter - // - // emit(EventT{}) - Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, error) -} - -// EmitFunc emits events. If any channel subscribed to the topic is blocked, -// calls to EmitFunc will block -// -// Calling this function with wrong event type will cause a panic -type EmitFunc func(event interface{}) - -func (f EmitFunc) Close() { - f(closeEmit) -} - -type CancelFunc func()