Subscribe with user provided typed channels

This commit is contained in:
Łukasz Magiera 2019-06-16 17:20:33 +02:00
parent eba14f59e2
commit 056412681f
3 changed files with 48 additions and 70 deletions

View File

@ -22,12 +22,7 @@ func NewBus() Bus {
}
}
func (b *bus) withNode(evtType interface{}, cb func(*node)) error {
typ := reflect.TypeOf(evtType)
if typ.Kind() != reflect.Ptr {
return errors.New("subscribe called with non-pointer type")
}
typ = typ.Elem()
func (b *bus) withNode(typ reflect.Type, cb func(*node)) error {
path := typePath(typ)
b.lk.Lock()
@ -45,8 +40,8 @@ func (b *bus) withNode(evtType interface{}, cb func(*node)) error {
return nil
}
func (b *bus) tryDropNode(evtType interface{}) {
path := typePath(reflect.TypeOf(evtType).Elem())
func (b *bus) tryDropNode(typ reflect.Type) {
path := typePath(typ)
b.lk.Lock()
n, ok := b.nodes[path]
@ -67,20 +62,27 @@ func (b *bus) tryDropNode(evtType interface{}) {
b.lk.Unlock()
}
func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface{}, c CancelFunc, err error) {
err = b.withNode(evtType, func(n *node) {
func (b *bus) Subscribe(typedChan interface{}, _ ...SubOption) (c CancelFunc, err error) {
refCh := reflect.ValueOf(typedChan)
typ := refCh.Type()
if typ.Kind() != reflect.Chan {
return nil, errors.New("expected a channel")
}
if typ.ChanDir() & reflect.SendDir == 0 {
return nil, errors.New("channel doesn't allow send")
}
err = b.withNode(typ.Elem(), func(n *node) {
// when all subs are waiting on this channel, setting this to 1 doesn't
// really affect benchmarks
out, i := n.sub(0)
s = out
i := n.sub(refCh)
c = func() {
n.lk.Lock()
delete(n.sinks, i)
close(out)
tryDrop := len(n.sinks) == 0 && n.nEmitters == 0
n.lk.Unlock()
if tryDrop {
b.tryDropNode(evtType)
b.tryDropNode(typ.Elem())
}
}
})
@ -88,7 +90,13 @@ func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface
}
func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c CancelFunc, err error) {
err = b.withNode(evtType, func(n *node) {
typ := reflect.TypeOf(evtType)
if typ.Kind() != reflect.Ptr {
return nil, nil, errors.New("emitter called with non-pointer type")
}
typ = typ.Elem()
err = b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
closed := false
@ -102,37 +110,13 @@ func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c Ca
c = func() {
closed = true
if atomic.AddInt32(&n.nEmitters, -1) == 0 {
b.tryDropNode(evtType)
b.tryDropNode(typ)
}
}
})
return
}
func (b *bus) SendTo(typedChan interface{}) (CancelFunc, error) {
typ := reflect.TypeOf(typedChan)
if typ.Kind() != reflect.Chan {
return nil, errors.New("expected a channel")
}
if typ.ChanDir() & reflect.SendDir == 0 {
return nil, errors.New("channel doesn't allow send")
}
etype := reflect.New(typ.Elem())
sub, cf, err := b.Subscribe(etype.Interface())
if err != nil {
return nil, err
}
go func() {
tcv := reflect.ValueOf(typedChan)
for event := range sub {
tcv.Send(reflect.ValueOf(event))
}
}()
return cf, nil
}
///////////////////////
// NODE
@ -150,34 +134,33 @@ type node struct {
// TODO: we could make emit a bit faster by making this into an array, but
// it doesn't seem needed for now
sinks map[int]chan interface{}
sinks map[int]reflect.Value
}
func newNode(typ reflect.Type) *node {
return &node{
typ: typ,
sinks: map[int]chan interface{}{},
sinks: map[int]reflect.Value{},
}
}
func (n *node) sub(buf int) (chan interface{}, int) {
out := make(chan interface{}, buf)
func (n *node) sub(outChan reflect.Value) int {
i := n.sinkC
n.sinkC++
n.sinks[i] = out
return out, i
n.sinks[i] = outChan
return i
}
func (n *node) emit(event interface{}) {
etype := reflect.TypeOf(event)
if etype != n.typ {
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, etype))
eval := reflect.ValueOf(event)
if eval.Type() != n.typ {
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, eval.Type()))
}
n.lk.RLock()
for _, ch := range n.sinks {
ch <- event
ch.Send(eval)
}
n.lk.RUnlock()
}

View File

@ -12,7 +12,8 @@ type EventB int
func TestEmit(t *testing.T) {
bus := NewBus()
events, cancel, err := bus.Subscribe(new(EventA))
events := make(chan EventA)
cancel, err := bus.Subscribe(events)
if err != nil {
t.Fatal(err)
}
@ -33,7 +34,8 @@ func TestEmit(t *testing.T) {
func TestSub(t *testing.T) {
bus := NewBus()
events, cancel, err := bus.Subscribe(new(EventB))
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
if err != nil {
t.Fatal(err)
}
@ -45,7 +47,7 @@ func TestSub(t *testing.T) {
go func() {
defer cancel()
event = (<-events).(EventB)
event = <-events
wait.Done()
}()
@ -114,7 +116,7 @@ func TestClosingRaces(t *testing.T) {
lk.RLock()
defer lk.RUnlock()
_, cancel, _ := b.Subscribe(new(EventA))
cancel, _ := b.Subscribe(make(chan EventA))
time.Sleep(10 * time.Millisecond)
cancel()
@ -157,14 +159,15 @@ func TestSubMany(t *testing.T) {
for i := 0; i < n; i++ {
go func() {
events, cancel, err := bus.Subscribe(new(EventB))
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
if err != nil {
panic(err)
}
defer cancel()
ready.Done()
atomic.AddInt32(&r, int32((<-events).(EventB)))
atomic.AddInt32(&r, int32(<-events))
wait.Done()
}()
}
@ -185,7 +188,7 @@ func TestSubMany(t *testing.T) {
}
}
func TestSendTo(t *testing.T) {
/*func TestSendTo(t *testing.T) {
testSendTo(t, 1000)
}
@ -219,7 +222,7 @@ func testSendTo(t testing.TB, msgs int) {
if int(r) != 97 * msgs {
t.Fatal("got wrong result")
}
}
}*/
func testMany(t testing.TB, subs, emits, msgs int) {
bus := NewBus()
@ -233,7 +236,8 @@ func testMany(t testing.TB, subs, emits, msgs int) {
for i := 0; i < subs; i++ {
go func() {
events, cancel, err := bus.Subscribe(new(EventB))
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
if err != nil {
panic(err)
}
@ -241,7 +245,7 @@ func testMany(t testing.TB, subs, emits, msgs int) {
ready.Done()
for i := 0; i < emits * msgs; i++ {
atomic.AddInt64(&r, int64((<-events).(EventB)))
atomic.AddInt64(&r, int64(<-events))
}
wait.Done()
}()
@ -330,9 +334,3 @@ func BenchmarkMs6e0m0(b *testing.B) {
b.ReportAllocs()
testMany(b, 1000000, 1, 1)
}
func BenchmarkSendTo(b *testing.B) {
b.N = 1000000
b.ReportAllocs()
testSendTo(b, b.N)
}

View File

@ -18,9 +18,7 @@ type Bus interface {
// defer cancel()
//
// evt := (<-sub).(os.Signal) // guaranteed to be safe
Subscribe(eventType interface{}, opts ...SubOption) (<-chan interface{}, CancelFunc, error)
SendTo(typedChan interface{}) (CancelFunc, error)
Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error)
Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, CancelFunc, error)
}
@ -32,4 +30,3 @@ type Bus interface {
type EmitFunc func(event interface{})
type CancelFunc func()