diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index a2db391b..c5133ed7 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -12,7 +12,7 @@ import ( // BUS type bus struct { - lk sync.Mutex + lk sync.Mutex nodes map[string]*node } @@ -75,7 +75,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, if typ.Kind() != reflect.Chan { return nil, errors.New("expected a channel") } - if typ.ChanDir() & reflect.SendDir == 0 { + if typ.ChanDir()&reflect.SendDir == 0 { return nil, errors.New("channel doesn't allow send") } @@ -144,11 +144,14 @@ type node struct { nEmitters int32 // sink index counter - sinkC int + sinkC int // TODO: we could make emit a bit faster by making this into an array, but // it doesn't seem needed for now - sinks map[int]reflect.Value + sinks map[int]reflect.Value + + keepLast bool + last reflect.Value } func newNode(typ reflect.Type) *node { diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index cd451205..c3014c5f 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -188,7 +188,7 @@ func TestSubMany(t *testing.T) { emit(EventB(7)) wait.Wait() - if int(r) != 7 * n { + if int(r) != 7*n { t.Error("got wrong result") } } @@ -246,7 +246,7 @@ func testMany(t testing.TB, subs, emits, msgs int) { defer cancel() ready.Done() - for i := 0; i < emits * msgs; i++ { + for i := 0; i < emits*msgs; i++ { atomic.AddInt64(&r, int64(<-events)) } wait.Done() @@ -273,7 +273,7 @@ func testMany(t testing.TB, subs, emits, msgs int) { wait.Wait() - if int(r) != 97 * subs * emits * msgs { + if int(r) != 97*subs*emits*msgs { t.Fatal("got wrong result") } } diff --git a/p2p/host/eventbus/interface.go b/p2p/host/eventbus/interface.go index fef8f817..c514e381 100644 --- a/p2p/host/eventbus/interface.go +++ b/p2p/host/eventbus/interface.go @@ -21,23 +21,22 @@ func ForceSubType(evtType interface{}) SubOption { } } -type EmitterSettings struct {} +type EmitterSettings struct{} type EmitterOption func(*EmitterSettings) type Bus interface { - // Subscribe creates new subscription. Failing to drain the incoming channel - // will cause publishers to get blocked + // Subscribe creates new subscription. Failing to drain the channel will cause + // publishers to get blocked + Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error) + + // Emitter creates new emitter // - // evtTypes only accepts typed nil pointers, and uses the type information to + // eventType accepts typed nil pointers, and uses the type information to // select output type // // Example: // sub, cancel, err := eventbus.Subscribe(new(os.Signal)) // defer cancel() - // - // evt := (<-sub).(os.Signal) // guaranteed to be safe - Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error) - Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, CancelFunc, error) }