From 056412681f91d96b6c3623c4945d15d0aada2998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 16 Jun 2019 17:20:33 +0200 Subject: [PATCH] Subscribe with user provided typed channels --- p2p/host/eventbus/basic.go | 83 +++++++++++++-------------------- p2p/host/eventbus/basic_test.go | 30 ++++++------ p2p/host/eventbus/interface.go | 5 +- 3 files changed, 48 insertions(+), 70 deletions(-) diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index f7556799..5d67edf5 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -22,12 +22,7 @@ func NewBus() Bus { } } -func (b *bus) withNode(evtType interface{}, cb func(*node)) error { - typ := reflect.TypeOf(evtType) - if typ.Kind() != reflect.Ptr { - return errors.New("subscribe called with non-pointer type") - } - typ = typ.Elem() +func (b *bus) withNode(typ reflect.Type, cb func(*node)) error { path := typePath(typ) b.lk.Lock() @@ -45,8 +40,8 @@ func (b *bus) withNode(evtType interface{}, cb func(*node)) error { return nil } -func (b *bus) tryDropNode(evtType interface{}) { - path := typePath(reflect.TypeOf(evtType).Elem()) +func (b *bus) tryDropNode(typ reflect.Type) { + path := typePath(typ) b.lk.Lock() n, ok := b.nodes[path] @@ -67,20 +62,27 @@ func (b *bus) tryDropNode(evtType interface{}) { b.lk.Unlock() } -func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface{}, c CancelFunc, err error) { - err = b.withNode(evtType, func(n *node) { +func (b *bus) Subscribe(typedChan interface{}, _ ...SubOption) (c CancelFunc, err error) { + refCh := reflect.ValueOf(typedChan) + typ := refCh.Type() + if typ.Kind() != reflect.Chan { + return nil, errors.New("expected a channel") + } + if typ.ChanDir() & reflect.SendDir == 0 { + return nil, errors.New("channel doesn't allow send") + } + + err = b.withNode(typ.Elem(), func(n *node) { // when all subs are waiting on this channel, setting this to 1 doesn't // really affect benchmarks - out, i := n.sub(0) - s = out + i := n.sub(refCh) c = func() { n.lk.Lock() delete(n.sinks, i) - close(out) tryDrop := len(n.sinks) == 0 && n.nEmitters == 0 n.lk.Unlock() if tryDrop { - b.tryDropNode(evtType) + b.tryDropNode(typ.Elem()) } } }) @@ -88,7 +90,13 @@ func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface } func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c CancelFunc, err error) { - err = b.withNode(evtType, func(n *node) { + typ := reflect.TypeOf(evtType) + if typ.Kind() != reflect.Ptr { + return nil, nil, errors.New("emitter called with non-pointer type") + } + typ = typ.Elem() + + err = b.withNode(typ, func(n *node) { atomic.AddInt32(&n.nEmitters, 1) closed := false @@ -102,37 +110,13 @@ func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c Ca c = func() { closed = true if atomic.AddInt32(&n.nEmitters, -1) == 0 { - b.tryDropNode(evtType) + b.tryDropNode(typ) } } }) return } -func (b *bus) SendTo(typedChan interface{}) (CancelFunc, error) { - typ := reflect.TypeOf(typedChan) - if typ.Kind() != reflect.Chan { - return nil, errors.New("expected a channel") - } - if typ.ChanDir() & reflect.SendDir == 0 { - return nil, errors.New("channel doesn't allow send") - } - etype := reflect.New(typ.Elem()) - sub, cf, err := b.Subscribe(etype.Interface()) - if err != nil { - return nil, err - } - - go func() { - tcv := reflect.ValueOf(typedChan) - for event := range sub { - tcv.Send(reflect.ValueOf(event)) - } - }() - - return cf, nil -} - /////////////////////// // NODE @@ -150,34 +134,33 @@ type node struct { // TODO: we could make emit a bit faster by making this into an array, but // it doesn't seem needed for now - sinks map[int]chan interface{} + sinks map[int]reflect.Value } func newNode(typ reflect.Type) *node { return &node{ typ: typ, - sinks: map[int]chan interface{}{}, + sinks: map[int]reflect.Value{}, } } -func (n *node) sub(buf int) (chan interface{}, int) { - out := make(chan interface{}, buf) +func (n *node) sub(outChan reflect.Value) int { i := n.sinkC n.sinkC++ - n.sinks[i] = out - return out, i + n.sinks[i] = outChan + return i } func (n *node) emit(event interface{}) { - etype := reflect.TypeOf(event) - if etype != n.typ { - panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, etype)) + eval := reflect.ValueOf(event) + if eval.Type() != n.typ { + panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, eval.Type())) } n.lk.RLock() for _, ch := range n.sinks { - ch <- event + ch.Send(eval) } n.lk.RUnlock() } diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index c8b3de5f..f77e5d74 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -12,7 +12,8 @@ type EventB int func TestEmit(t *testing.T) { bus := NewBus() - events, cancel, err := bus.Subscribe(new(EventA)) + events := make(chan EventA) + cancel, err := bus.Subscribe(events) if err != nil { t.Fatal(err) } @@ -33,7 +34,8 @@ func TestEmit(t *testing.T) { func TestSub(t *testing.T) { bus := NewBus() - events, cancel, err := bus.Subscribe(new(EventB)) + events := make(chan EventB) + cancel, err := bus.Subscribe(events) if err != nil { t.Fatal(err) } @@ -45,7 +47,7 @@ func TestSub(t *testing.T) { go func() { defer cancel() - event = (<-events).(EventB) + event = <-events wait.Done() }() @@ -114,7 +116,7 @@ func TestClosingRaces(t *testing.T) { lk.RLock() defer lk.RUnlock() - _, cancel, _ := b.Subscribe(new(EventA)) + cancel, _ := b.Subscribe(make(chan EventA)) time.Sleep(10 * time.Millisecond) cancel() @@ -157,14 +159,15 @@ func TestSubMany(t *testing.T) { for i := 0; i < n; i++ { go func() { - events, cancel, err := bus.Subscribe(new(EventB)) + events := make(chan EventB) + cancel, err := bus.Subscribe(events) if err != nil { panic(err) } defer cancel() ready.Done() - atomic.AddInt32(&r, int32((<-events).(EventB))) + atomic.AddInt32(&r, int32(<-events)) wait.Done() }() } @@ -185,7 +188,7 @@ func TestSubMany(t *testing.T) { } } -func TestSendTo(t *testing.T) { +/*func TestSendTo(t *testing.T) { testSendTo(t, 1000) } @@ -219,7 +222,7 @@ func testSendTo(t testing.TB, msgs int) { if int(r) != 97 * msgs { t.Fatal("got wrong result") } -} +}*/ func testMany(t testing.TB, subs, emits, msgs int) { bus := NewBus() @@ -233,7 +236,8 @@ func testMany(t testing.TB, subs, emits, msgs int) { for i := 0; i < subs; i++ { go func() { - events, cancel, err := bus.Subscribe(new(EventB)) + events := make(chan EventB) + cancel, err := bus.Subscribe(events) if err != nil { panic(err) } @@ -241,7 +245,7 @@ func testMany(t testing.TB, subs, emits, msgs int) { ready.Done() for i := 0; i < emits * msgs; i++ { - atomic.AddInt64(&r, int64((<-events).(EventB))) + atomic.AddInt64(&r, int64(<-events)) } wait.Done() }() @@ -330,9 +334,3 @@ func BenchmarkMs6e0m0(b *testing.B) { b.ReportAllocs() testMany(b, 1000000, 1, 1) } - -func BenchmarkSendTo(b *testing.B) { - b.N = 1000000 - b.ReportAllocs() - testSendTo(b, b.N) -} diff --git a/p2p/host/eventbus/interface.go b/p2p/host/eventbus/interface.go index 3b505296..d8b78fe3 100644 --- a/p2p/host/eventbus/interface.go +++ b/p2p/host/eventbus/interface.go @@ -18,9 +18,7 @@ type Bus interface { // defer cancel() // // evt := (<-sub).(os.Signal) // guaranteed to be safe - Subscribe(eventType interface{}, opts ...SubOption) (<-chan interface{}, CancelFunc, error) - - SendTo(typedChan interface{}) (CancelFunc, error) + Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error) Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, CancelFunc, error) } @@ -32,4 +30,3 @@ type Bus interface { type EmitFunc func(event interface{}) type CancelFunc func() -