Drop the interface

This commit is contained in:
Łukasz Magiera 2019-06-19 15:36:31 +02:00
parent 45c47c8d5e
commit ee0cf463ce
3 changed files with 40 additions and 46 deletions

View File

@ -11,6 +11,7 @@ import (
/////////////////////// ///////////////////////
// BUS // BUS
// basicBus is a type-based event delivery system
type basicBus struct { type basicBus struct {
lk sync.Mutex lk sync.Mutex
nodes map[reflect.Type]*node nodes map[reflect.Type]*node
@ -64,6 +65,15 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
b.lk.Unlock() b.lk.Unlock()
} }
// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
//
// Example:
// ch := make(chan EventT, 10)
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) { func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
var settings subSettings var settings subSettings
for _, opt := range opts { for _, opt := range opts {
@ -118,6 +128,16 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...SubOption) (c Cancel
return return
} }
// Emitter creates new emitter
//
// eventType accepts typed nil pointers, and uses the type information to
// select output type
//
// Example:
// emit, err := eventbus.Emitter(new(EventT))
// defer emit.Close() // MUST call this after being done with the emitter
//
// emit(EventT{})
func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) { func (b *basicBus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, err error) {
var settings emitterSettings var settings emitterSettings
for _, opt := range opts { for _, opt := range opts {
@ -194,6 +214,18 @@ func (n *node) emit(event interface{}) {
} }
/////////////////////// ///////////////////////
// UTILS // TYPES
var _ Bus = &basicBus{} var closeEmit struct{}
// EmitFunc emits events. If any channel subscribed to the topic is blocked,
// calls to EmitFunc will block
//
// Calling this function with wrong event type will cause a panic
type EmitFunc func(event interface{})
func (f EmitFunc) Close() {
f(closeEmit)
}
type CancelFunc func()

View File

@ -337,8 +337,9 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
for i := 0; i < emits; i++ { for i := 0; i < emits; i++ {
go func() { go func() {
emit, err := bus.Emitter(new(EventB), func(settings *emitterSettings) { emit, err := bus.Emitter(new(EventB), func(settings interface{}) error {
settings.makeStateful = stateful settings.(*emitterSettings).makeStateful = stateful
return nil
}) })
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -5,8 +5,6 @@ import (
"reflect" "reflect"
) )
var closeEmit struct{}
type subSettings struct { type subSettings struct {
forcedType reflect.Type forcedType reflect.Type
} }
@ -53,44 +51,7 @@ type EmitterOption func(interface{}) error
// //
// This allows to provide state tracking for dynamic systems, and/or // This allows to provide state tracking for dynamic systems, and/or
// allows new subscribers to verify that there are Emitters on the channel // allows new subscribers to verify that there are Emitters on the channel
func Stateful(s *emitterSettings) { func Stateful(s interface{}) error {
s.makeStateful = true s.(*emitterSettings).makeStateful = true
return nil
} }
// Bus is an interface to type-based event delivery system
type Bus interface {
// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
//
// Example:
// ch := make(chan EventT, 10)
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error)
// Emitter creates new emitter
//
// eventType accepts typed nil pointers, and uses the type information to
// select output type
//
// Example:
// emit, err := eventbus.Emitter(new(EventT))
// defer emit.Close() // MUST call this after being done with the emitter
//
// emit(EventT{})
Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, error)
}
// EmitFunc emits events. If any channel subscribed to the topic is blocked,
// calls to EmitFunc will block
//
// Calling this function with wrong event type will cause a panic
type EmitFunc func(event interface{})
func (f EmitFunc) Close() {
f(closeEmit)
}
type CancelFunc func()