From 220bf05e8d8998cb49220abe2f7eacb99653240e Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 21 May 2019 15:33:22 +0300 Subject: [PATCH] Ethereum event.Feed is used for messenger events (#44) It allows not to block producer when there are not consumers. And it allows to have more then one consumer in different parts of the application. --- chat.go | 18 +++++++++++++----- chat_test.go | 2 +- protocol/client/event.go | 10 ++++++++++ protocol/client/events.go | 12 ++++++++---- protocol/client/messenger_v2.go | 15 +++++++-------- protocol/gethservice/api.go | 2 +- protocol/gethservice/broadcaster.go | 18 ++++++++++++++---- 7 files changed, 54 insertions(+), 23 deletions(-) diff --git a/chat.go b/chat.go index 4addb88..6ad694d 100644 --- a/chat.go +++ b/chat.go @@ -57,10 +57,12 @@ func (c *ChatViewController) readEventsLoop(contact client.Contact) { messages = []*protocol.Message{} clock int64 inorder bool + events = make(chan client.Event) + sub = c.messenger.Subscribe(events) + // We use a ticker in order to buffer storm of received events. + t = time.NewTicker(time.Second) ) - - // We use a ticker in order to buffer storm of received events. - t := time.NewTicker(time.Second) + defer sub.Unsubscribe() defer t.Stop() for { @@ -86,10 +88,16 @@ func (c *ChatViewController) readEventsLoop(contact client.Contact) { } } messages = []*protocol.Message{} - case event := <-c.messenger.Events(): + case err := <-sub.Err(): + if err == nil { + return + } + c.onError(err) + return + case event := <-events: log.Printf("[ChatViewController::readEventsLoops] received an event: %+v", event) - switch ev := event.(type) { + switch ev := event.Interface.(type) { case client.EventWithError: c.onError(ev.GetError()) case client.EventWithContact: diff --git a/chat_test.go b/chat_test.go index f118005..ec2a35a 100644 --- a/chat_test.go +++ b/chat_test.go @@ -64,7 +64,7 @@ func TestSendMessage(t *testing.T) { err = vc.Select(client.Contact{Name: chatName, Type: client.ContactPublicRoom, Topic: chatName}) require.NoError(t, err) // close reading loops - defer close(vc.cancel) + close(vc.cancel) err = vc.Send(payload) require.NoError(t, err) diff --git a/protocol/client/event.go b/protocol/client/event.go index 1fdb6f0..4e5199f 100644 --- a/protocol/client/event.go +++ b/protocol/client/event.go @@ -16,6 +16,16 @@ const ( EventTypeError ) +// Event is used to workaround event.Feed type checking. +// Every event.Feed instance will remember first type that was used either in Send or Subscribe. +// After that value of every object will be matched against that type. +// For example if we subscribed first with interface{} - feed.etype will be changed to interface{} +// and then when client.messageFeed is posted to event.Feed it will get value and match it against interface{}. +// Feed type checking is either not accurate or it was designed to prevent subscribing with various interfaces. +type Event struct { + Interface interface{} +} + type EventWithContact interface { GetContact() Contact } diff --git a/protocol/client/events.go b/protocol/client/events.go index e08f942..0513182 100644 --- a/protocol/client/events.go +++ b/protocol/client/events.go @@ -1,13 +1,16 @@ package client -import "github.com/status-im/status-console-client/protocol/v1" +import ( + "github.com/ethereum/go-ethereum/event" + "github.com/status-im/status-console-client/protocol/v1" +) type DatabaseWithEvents struct { Database - feed chan<- interface{} + feed *event.Feed } -func NewDatabaseWithEvents(db Database, feed chan<- interface{}) DatabaseWithEvents { +func NewDatabaseWithEvents(db Database, feed *event.Feed) DatabaseWithEvents { return DatabaseWithEvents{Database: db, feed: feed} } @@ -17,13 +20,14 @@ func (db DatabaseWithEvents) SaveMessages(c Contact, msgs []*protocol.Message) ( return rowid, err } for _, m := range msgs { - db.feed <- messageEvent{ + ev := messageEvent{ baseEvent: baseEvent{ Contact: c, Type: EventTypeMessage, }, Message: m, } + db.feed.Send(Event{ev}) } return rowid, err } diff --git a/protocol/client/messenger_v2.go b/protocol/client/messenger_v2.go index 88b778a..67686f0 100644 --- a/protocol/client/messenger_v2.go +++ b/protocol/client/messenger_v2.go @@ -7,22 +7,21 @@ import ( "log" "sync" + "github.com/ethereum/go-ethereum/event" "github.com/pkg/errors" "github.com/status-im/status-console-client/protocol/v1" ) func NewMessengerV2(identity *ecdsa.PrivateKey, proto protocol.Protocol, db Database) MessengerV2 { - feed := make(chan interface{}) + events := &event.Feed{} return MessengerV2{ identity: identity, proto: proto, - db: NewDatabaseWithEvents(db, feed), + db: NewDatabaseWithEvents(db, events), public: map[string]AsyncStream{}, private: map[string]AsyncStream{}, - // FIXME(dshulyak) add sufficient buffer to this channel - // it may block stream that receives messages - events: feed, + events: events, } } @@ -35,7 +34,7 @@ type MessengerV2 struct { public map[string]AsyncStream private map[string]AsyncStream - events chan interface{} + events *event.Feed } func (m *MessengerV2) Start() error { @@ -238,6 +237,6 @@ func (m *MessengerV2) Leave(c Contact) error { return nil } -func (m *MessengerV2) Events() <-chan interface{} { - return m.events +func (m *MessengerV2) Subscribe(events chan Event) event.Subscription { + return m.events.Subscribe(events) } diff --git a/protocol/gethservice/api.go b/protocol/gethservice/api.go index 89ca78d..c88568e 100644 --- a/protocol/gethservice/api.go +++ b/protocol/gethservice/api.go @@ -189,7 +189,7 @@ func (api *PublicAPI) Chat(ctx context.Context, contact client.Contact) (*rpc.Su // Create a broadcaster instance. // TODO: move it. if api.broadcaster == nil { - api.broadcaster = newBroadcaster(api.service.messenger.Events()) + api.broadcaster = newBroadcaster(api.service.messenger) } // Subscription needs to be created diff --git a/protocol/gethservice/broadcaster.go b/protocol/gethservice/broadcaster.go index 8067cb2..e69c29d 100644 --- a/protocol/gethservice/broadcaster.go +++ b/protocol/gethservice/broadcaster.go @@ -1,16 +1,21 @@ package gethservice import ( + "github.com/ethereum/go-ethereum/event" "github.com/status-im/status-console-client/protocol/client" ) +type publisher interface { + Subscribe(chan client.Event) event.Subscription +} + type broadcaster struct { - source <-chan interface{} + source publisher subs map[client.Contact][]chan interface{} cancel chan struct{} } -func newBroadcaster(source <-chan interface{}) *broadcaster { +func newBroadcaster(source publisher) *broadcaster { b := broadcaster{ source: source, subs: make(map[client.Contact][]chan interface{}), @@ -23,12 +28,14 @@ func newBroadcaster(source <-chan interface{}) *broadcaster { } func (b *broadcaster) start(cancel chan struct{}) { + events := make(chan client.Event) + sub := b.source.Subscribe(events) for { select { - case item := <-b.source: + case item := <-events: var subs []chan interface{} - switch v := item.(type) { + switch v := item.Interface.(type) { case client.EventWithContact: subs = b.subs[v.GetContact()] } @@ -38,7 +45,10 @@ func (b *broadcaster) start(cancel chan struct{}) { for _, out := range subs { out <- item } + case <-sub.Err(): + return case <-cancel: + sub.Unsubscribe() return } }