refactor: filter

- Create a channel when a subscription is created
- Add stop function for protocols
This commit is contained in:
Richard Ramos 2021-10-11 18:45:54 -04:00
parent 6f730abd38
commit dc52ba182a
8 changed files with 147 additions and 75 deletions

View File

@ -84,14 +84,17 @@ func main() {
// //
// Send FilterRequest from light node to full node // Send FilterRequest from light node to full node
filterChan := make(filter.ContentFilterChan) _, filterChan, err := lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic})
if err != nil {
panic(err)
}
go func() { go func() {
for env := range filterChan { for env := range filterChan {
log.Info("Light node received msg, ", string(env.Message().Payload)) log.Info("Light node received msg, ", string(env.Message().Payload))
} }
log.Info("Message channel closed!")
}() }()
lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}, filterChan)
go writeLoop(ctx, fullNode) go writeLoop(ctx, fullNode)
go readLoop(ctx, fullNode) go readLoop(ctx, fullNode)

View File

@ -40,7 +40,7 @@ func TestBasicSendingReceiving(t *testing.T) {
require.NoError(t, write(ctx, wakuNode, "test")) require.NoError(t, write(ctx, wakuNode, "test"))
sub, err := wakuNode.Subscribe(nil) sub, err := wakuNode.Subscribe(ctx, nil)
require.NoError(t, err) require.NoError(t, err)
value := <-sub.C value := <-sub.C

View File

@ -18,8 +18,11 @@ type Subscription struct {
// Unsubscribe will close a subscription from a pubsub topic. Will close the message channel // Unsubscribe will close a subscription from a pubsub topic. Will close the message channel
func (subs *Subscription) Unsubscribe() { func (subs *Subscription) Unsubscribe() {
subs.mutex.Lock()
defer subs.mutex.Unlock()
if !subs.closed { if !subs.closed {
close(subs.quit) close(subs.quit)
subs.closed = true
} }
} }

View File

@ -46,6 +46,7 @@ type WakuNode struct {
lightPush *lightpush.WakuLightPush lightPush *lightpush.WakuLightPush
rendezvous *rendezvous.RendezvousService rendezvous *rendezvous.RendezvousService
ping *ping.PingService ping *ping.PingService
store *store.WakuStore
subscriptions map[relay.Topic][]*Subscription subscriptions map[relay.Topic][]*Subscription
subscriptionsMutex sync.Mutex subscriptionsMutex sync.Mutex
@ -184,13 +185,30 @@ func (w *WakuNode) Stop() {
w.rendezvous.Stop() w.rendezvous.Stop()
} }
for _, topic := range w.relay.Topics() { if w.relay != nil {
for _, sub := range w.subscriptions[topic] { for _, topic := range w.relay.Topics() {
sub.Unsubscribe() for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
} }
w.subscriptions = nil
} }
w.subscriptions = nil if w.filter != nil {
w.filter.Stop()
for _, filter := range w.filters {
close(filter.Chan)
}
w.filters = nil
}
if w.lightPush != nil {
w.lightPush.Stop()
}
if w.store != nil {
w.store.Stop()
}
w.host.Close() w.host.Close()
} }
@ -264,7 +282,8 @@ func (w *WakuNode) mountRendezvous() error {
} }
func (w *WakuNode) startStore() { func (w *WakuNode) startStore() {
w.opts.store.Start(w.ctx, w.host) w.store = w.opts.store
w.store.Start(w.ctx, w.host)
if w.opts.shouldResume { if w.opts.shouldResume {
// TODO: extract this to a function and run it when you go offline // TODO: extract this to a function and run it when you go offline
@ -315,7 +334,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.
} }
func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) { func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) {
if w.opts.store == nil { if w.store == nil {
return nil, errors.New("WakuStore is not set") return nil, errors.New("WakuStore is not set")
} }
@ -328,7 +347,7 @@ func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime
query.StartTime = startTime query.StartTime = startTime
query.EndTime = endTime query.EndTime = endTime
query.PagingInfo = new(pb.PagingInfo) query.PagingInfo = new(pb.PagingInfo)
result, err := w.opts.store.Query(ctx, query, opts...) result, err := w.store.Query(ctx, query, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -336,11 +355,11 @@ func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime
} }
func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error { func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error {
if w.opts.store == nil { if w.store == nil {
return errors.New("WakuStore is not set") return errors.New("WakuStore is not set")
} }
result, err := w.opts.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList) result, err := w.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList)
if err != nil { if err != nil {
return err return err
} }
@ -439,34 +458,62 @@ func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription
// Wrapper around WakuFilter.Subscribe // Wrapper around WakuFilter.Subscribe
// that adds a Filter object to node.filters // that adds a Filter object to node.filters
// TODO: what's up with this channel?.......................... is it closed eventually? func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string) (filterID string, ch chan *protocol.Envelope, err error) {
func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string, ch filter.ContentFilterChan) error {
if node.filter == nil { if node.filter == nil {
return errors.New("WakuFilter is not set") err = errors.New("WakuFilter is not set")
return
} }
// TODO: should be possible to pass the peerID as option or autoselect peer.
// TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received. // Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs // ContentFilterChan takes MessagePush structs
var peerID *peer.ID
id, peerID, err := node.filter.Subscribe(ctx, topic, contentTopics) filterID, peerID, err = node.filter.Subscribe(ctx, topic, contentTopics)
if id == "" || err != nil { if filterID == "" || err != nil {
// Failed to subscribe // Failed to subscribe
log.Error("remote subscription to filter failed") log.Error("remote subscription to filter failed")
return err return
} }
ch = make(chan *protocol.Envelope, 1024) // To avoid blocking
// Register handler for filter, whether remote subscription succeeded or not // Register handler for filter, whether remote subscription succeeded or not
node.filters[id] = filter.Filter{ node.filters[filterID] = filter.Filter{
PeerID: *peerID, PeerID: *peerID,
Topic: topic, Topic: topic,
ContentFilters: contentTopics, ContentFilters: contentTopics,
Chan: ch, Chan: ch,
} }
return
}
// UnsubscribeFilterByID removes a subscription to a filter node completely
// using the filterID returned when the subscription was created
func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string) error {
var f filter.Filter
var ok bool
if f, ok = node.filters[filterID]; !ok {
return errors.New("filter not found")
}
err := node.filter.Unsubscribe(ctx, f.Topic, f.ContentFilters, f.PeerID)
if err != nil {
return err
}
close(f.Chan)
delete(node.filters, filterID)
return nil return nil
} }
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) { // Unsubscribe filter removes content topics from a filter subscription. If all
// the contentTopics are removed the subscription is dropped completely
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) error {
// Remove local filter // Remove local filter
var idsToRemove []string var idsToRemove []string
for id, f := range node.filters { for id, f := range node.filters {
@ -499,11 +546,14 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, conte
for _, rId := range idsToRemove { for _, rId := range idsToRemove {
for id := range node.filters { for id := range node.filters {
if id == rId { if id == rId {
close(node.filters[id].Chan)
delete(node.filters, id) delete(node.filters, id)
break break
} }
} }
} }
return nil
} }
func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) {

View File

@ -22,13 +22,11 @@ import (
var log = logging.Logger("wakufilter") var log = logging.Logger("wakufilter")
type ( type (
ContentFilterChan chan *protocol.Envelope
Filter struct { Filter struct {
PeerID peer.ID PeerID peer.ID
Topic string Topic string
ContentFilters []string ContentFilters []string
Chan ContentFilterChan Chan chan *protocol.Envelope
} }
// @TODO MAYBE MORE INFO? // @TODO MAYBE MORE INFO?
@ -234,7 +232,7 @@ func (wf *WakuFilter) FilterListener() {
// Having a FilterRequest struct, // Having a FilterRequest struct,
// select a peer with filter support, dial it, // select a peer with filter support, dial it,
// and submit FilterRequest wrapped in FilterRPC // and submit FilterRequest wrapped in FilterRPC
func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (string, *peer.ID, error) { func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (requestID string, peer *peer.ID, err error) {
var contentFilters []*pb.FilterRequest_ContentFilter var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range contentTopics { for _, ct := range contentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
@ -246,61 +244,67 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics
ContentFilters: contentFilters, ContentFilters: contentFilters,
} }
peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1)) peer, err = utils.SelectPeer(wf.h, string(FilterID_v20beta1))
if err == nil { if err != nil {
conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1) return
if conn != nil {
defer conn.Close()
// This is the only successful path to subscription
id := protocol.GenerateRequestId()
writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
log.Info("sending filterRPC: ", filterRPC)
err = writer.WriteMsg(filterRPC)
if err != nil {
log.Error("failed to write message", err)
return "", nil, err
}
return string(id), peer, nil
} else {
log.Error("failed to connect to remote peer")
return "", nil, err
}
} }
log.Info("error selecting peer: ", err) var conn network.Stream
return "", nil, err conn, err = wf.h.NewStream(ctx, *peer, FilterID_v20beta1)
if err != nil {
return
}
defer conn.Close()
// This is the only successful path to subscription
requestID = hex.EncodeToString(protocol.GenerateRequestId())
writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request}
log.Info("sending filterRPC: ", filterRPC)
err = writer.WriteMsg(filterRPC)
if err != nil {
log.Error("failed to write message", err)
return
}
return
} }
func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) { func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) error {
conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
if conn != nil {
defer conn.Close()
// This is the only successful path to subscription if err != nil {
id := protocol.GenerateRequestId() return err
var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range contentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
request := pb.FilterRequest{
Subscribe: false,
Topic: topic,
ContentFilters: contentFilters,
}
writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
err = writer.WriteMsg(filterRPC)
if err != nil {
log.Error("failed to write message", err)
}
} else {
log.Error("failed to connect to remote peer", err)
} }
defer conn.Close()
// This is the only successful path to subscription
id := protocol.GenerateRequestId()
var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range contentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
request := pb.FilterRequest{
Subscribe: false,
Topic: topic,
ContentFilters: contentFilters,
}
writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
err = writer.WriteMsg(filterRPC)
if err != nil {
return err
}
return nil
}
func (wf *WakuFilter) Stop() {
wf.h.RemoveStreamHandler(FilterID_v20beta1)
} }

View File

@ -203,3 +203,7 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
return pushResponseRPC.Response, nil return pushResponseRPC.Response, nil
} }
func (w *WakuLightPush) Stop() {
w.h.RemoveStreamHandler(LightPushID_v20beta1)
}

View File

@ -175,3 +175,7 @@ func GetTopic(topic *Topic) Topic {
} }
return t return t
} }
func (w *WakuRelay) Stop() {
w.host.RemoveStreamHandler(WakuRelayID_v200)
}

View File

@ -622,3 +622,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
} }
// TODO: queryWithAccounting // TODO: queryWithAccounting
func (w *WakuStore) Stop() {
w.h.RemoveStreamHandler(StoreID_v20beta3)
}