From 25d453370e7663a505a47f277ef33b41436e4297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 16 Jun 2019 18:15:35 +0200 Subject: [PATCH] ForceSubType --- p2p/host/eventbus/basic.go | 17 +++++++++++- p2p/host/eventbus/basic_test.go | 48 +++++++++++++++++---------------- p2p/host/eventbus/interface.go | 22 +++++++++++++-- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index 5d67edf5..a2db391b 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -62,7 +62,14 @@ func (b *bus) tryDropNode(typ reflect.Type) { b.lk.Unlock() } -func (b *bus) Subscribe(typedChan interface{}, _ ...SubOption) (c CancelFunc, err error) { +func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) { + var settings SubSettings + for _, opt := range opts { + if err := opt(&settings); err != nil { + return nil, err + } + } + refCh := reflect.ValueOf(typedChan) typ := refCh.Type() if typ.Kind() != reflect.Chan { @@ -72,6 +79,13 @@ func (b *bus) Subscribe(typedChan interface{}, _ ...SubOption) (c CancelFunc, er return nil, errors.New("channel doesn't allow send") } + if settings.forcedType != nil { + if settings.forcedType.Elem().AssignableTo(typ) { + return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ) + } + typ = settings.forcedType + } + err = b.withNode(typ.Elem(), func(n *node) { // when all subs are waiting on this channel, setting this to 1 doesn't // really affect benchmarks @@ -159,6 +173,7 @@ func (n *node) emit(event interface{}) { } n.lk.RLock() + // TODO: try using reflect.Select for _, ch := range n.sinks { ch.Send(eval) } diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index f77e5d74..cd451205 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -1,6 +1,7 @@ package event import ( + "fmt" "sync" "sync/atomic" "testing" @@ -10,6 +11,10 @@ import ( type EventA struct{} type EventB int +func (EventA) String() string { + return "Oh, Hello" +} + func TestEmit(t *testing.T) { bus := NewBus() events := make(chan EventA) @@ -188,41 +193,38 @@ func TestSubMany(t *testing.T) { } } -/*func TestSendTo(t *testing.T) { - testSendTo(t, 1000) -} - -func testSendTo(t testing.TB, msgs int) { +func TestSubType(t *testing.T) { bus := NewBus() + events := make(chan fmt.Stringer) + cancel, err := bus.Subscribe(events, ForceSubType(new(EventA))) + if err != nil { + t.Fatal(err) + } + + var event fmt.Stringer + + var wait sync.WaitGroup + wait.Add(1) go func() { - emit, cancel, err := bus.Emitter(new(EventB)) - if err != nil { - panic(err) - } defer cancel() - - for i := 0; i < msgs; i++ { - emit(EventB(97)) - } + event = <-events + wait.Done() }() - ch := make(chan EventB) - cancel, err := bus.SendTo(ch) + emit, cancel, err := bus.Emitter(new(EventA)) if err != nil { - return + t.Fatal(err) } defer cancel() - r := 0 - for i := 0; i < msgs; i++ { - r += int(<-ch) - } + emit(EventA{}) + wait.Wait() - if int(r) != 97 * msgs { - t.Fatal("got wrong result") + if event.String() != "Oh, Hello" { + t.Error("didn't get the correct message") } -}*/ +} func testMany(t testing.TB, subs, emits, msgs int) { bus := NewBus() diff --git a/p2p/host/eventbus/interface.go b/p2p/host/eventbus/interface.go index d8b78fe3..fef8f817 100644 --- a/p2p/host/eventbus/interface.go +++ b/p2p/host/eventbus/interface.go @@ -1,7 +1,25 @@ package event -type SubSettings struct {} -type SubOption func(*SubSettings) +import ( + "errors" + "reflect" +) + +type SubSettings struct { + forcedType reflect.Type +} +type SubOption func(*SubSettings) error + +func ForceSubType(evtType interface{}) SubOption { + return func(s *SubSettings) error { + typ := reflect.TypeOf(evtType) + if typ.Kind() != reflect.Ptr { + return errors.New("ForceSubType called with non-pointer type") + } + s.forcedType = typ + return nil + } +} type EmitterSettings struct {} type EmitterOption func(*EmitterSettings)