Ethereum event.Feed is used for messenger events (#44)

It allows not to block producer when there are not consumers.
And it allows to have more then one consumer in different parts of the application.
This commit is contained in:
Dmitry Shulyak 2019-05-21 15:33:22 +03:00 committed by GitHub
parent a8314adb0b
commit 220bf05e8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 54 additions and 23 deletions

18
chat.go
View File

@ -57,10 +57,12 @@ func (c *ChatViewController) readEventsLoop(contact client.Contact) {
messages = []*protocol.Message{}
clock int64
inorder bool
)
events = make(chan client.Event)
sub = c.messenger.Subscribe(events)
// We use a ticker in order to buffer storm of received events.
t := time.NewTicker(time.Second)
t = time.NewTicker(time.Second)
)
defer sub.Unsubscribe()
defer t.Stop()
for {
@ -86,10 +88,16 @@ func (c *ChatViewController) readEventsLoop(contact client.Contact) {
}
}
messages = []*protocol.Message{}
case event := <-c.messenger.Events():
case err := <-sub.Err():
if err == nil {
return
}
c.onError(err)
return
case event := <-events:
log.Printf("[ChatViewController::readEventsLoops] received an event: %+v", event)
switch ev := event.(type) {
switch ev := event.Interface.(type) {
case client.EventWithError:
c.onError(ev.GetError())
case client.EventWithContact:

View File

@ -64,7 +64,7 @@ func TestSendMessage(t *testing.T) {
err = vc.Select(client.Contact{Name: chatName, Type: client.ContactPublicRoom, Topic: chatName})
require.NoError(t, err)
// close reading loops
defer close(vc.cancel)
close(vc.cancel)
err = vc.Send(payload)
require.NoError(t, err)

View File

@ -16,6 +16,16 @@ const (
EventTypeError
)
// Event is used to workaround event.Feed type checking.
// Every event.Feed instance will remember first type that was used either in Send or Subscribe.
// After that value of every object will be matched against that type.
// For example if we subscribed first with interface{} - feed.etype will be changed to interface{}
// and then when client.messageFeed is posted to event.Feed it will get value and match it against interface{}.
// Feed type checking is either not accurate or it was designed to prevent subscribing with various interfaces.
type Event struct {
Interface interface{}
}
type EventWithContact interface {
GetContact() Contact
}

View File

@ -1,13 +1,16 @@
package client
import "github.com/status-im/status-console-client/protocol/v1"
import (
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-console-client/protocol/v1"
)
type DatabaseWithEvents struct {
Database
feed chan<- interface{}
feed *event.Feed
}
func NewDatabaseWithEvents(db Database, feed chan<- interface{}) DatabaseWithEvents {
func NewDatabaseWithEvents(db Database, feed *event.Feed) DatabaseWithEvents {
return DatabaseWithEvents{Database: db, feed: feed}
}
@ -17,13 +20,14 @@ func (db DatabaseWithEvents) SaveMessages(c Contact, msgs []*protocol.Message) (
return rowid, err
}
for _, m := range msgs {
db.feed <- messageEvent{
ev := messageEvent{
baseEvent: baseEvent{
Contact: c,
Type: EventTypeMessage,
},
Message: m,
}
db.feed.Send(Event{ev})
}
return rowid, err
}

View File

@ -7,22 +7,21 @@ import (
"log"
"sync"
"github.com/ethereum/go-ethereum/event"
"github.com/pkg/errors"
"github.com/status-im/status-console-client/protocol/v1"
)
func NewMessengerV2(identity *ecdsa.PrivateKey, proto protocol.Protocol, db Database) MessengerV2 {
feed := make(chan interface{})
events := &event.Feed{}
return MessengerV2{
identity: identity,
proto: proto,
db: NewDatabaseWithEvents(db, feed),
db: NewDatabaseWithEvents(db, events),
public: map[string]AsyncStream{},
private: map[string]AsyncStream{},
// FIXME(dshulyak) add sufficient buffer to this channel
// it may block stream that receives messages
events: feed,
events: events,
}
}
@ -35,7 +34,7 @@ type MessengerV2 struct {
public map[string]AsyncStream
private map[string]AsyncStream
events chan interface{}
events *event.Feed
}
func (m *MessengerV2) Start() error {
@ -238,6 +237,6 @@ func (m *MessengerV2) Leave(c Contact) error {
return nil
}
func (m *MessengerV2) Events() <-chan interface{} {
return m.events
func (m *MessengerV2) Subscribe(events chan Event) event.Subscription {
return m.events.Subscribe(events)
}

View File

@ -189,7 +189,7 @@ func (api *PublicAPI) Chat(ctx context.Context, contact client.Contact) (*rpc.Su
// Create a broadcaster instance.
// TODO: move it.
if api.broadcaster == nil {
api.broadcaster = newBroadcaster(api.service.messenger.Events())
api.broadcaster = newBroadcaster(api.service.messenger)
}
// Subscription needs to be created

View File

@ -1,16 +1,21 @@
package gethservice
import (
"github.com/ethereum/go-ethereum/event"
"github.com/status-im/status-console-client/protocol/client"
)
type publisher interface {
Subscribe(chan client.Event) event.Subscription
}
type broadcaster struct {
source <-chan interface{}
source publisher
subs map[client.Contact][]chan interface{}
cancel chan struct{}
}
func newBroadcaster(source <-chan interface{}) *broadcaster {
func newBroadcaster(source publisher) *broadcaster {
b := broadcaster{
source: source,
subs: make(map[client.Contact][]chan interface{}),
@ -23,12 +28,14 @@ func newBroadcaster(source <-chan interface{}) *broadcaster {
}
func (b *broadcaster) start(cancel chan struct{}) {
events := make(chan client.Event)
sub := b.source.Subscribe(events)
for {
select {
case item := <-b.source:
case item := <-events:
var subs []chan interface{}
switch v := item.(type) {
switch v := item.Interface.(type) {
case client.EventWithContact:
subs = b.subs[v.GetContact()]
}
@ -38,7 +45,10 @@ func (b *broadcaster) start(cancel chan struct{}) {
for _, out := range subs {
out <- item
}
case <-sub.Err():
return
case <-cancel:
sub.Unsubscribe()
return
}
}