refactor: change filter function parameters to not use protobuffers

This commit is contained in:
Richard Ramos 2021-10-10 18:44:07 -04:00
parent 8a28978f83
commit 13aee0b1e0
9 changed files with 87 additions and 97 deletions

View File

@ -53,7 +53,7 @@ func main() {
} }
go writeLoop(ctx, wakuNode) go writeLoop(ctx, wakuNode)
go readLoop(wakuNode) go readLoop(ctx, wakuNode)
// Wait for a SIGINT or SIGTERM signal // Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
@ -105,8 +105,8 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
} }
} }
func readLoop(wakuNode *node.WakuNode) { func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
sub, err := wakuNode.Subscribe(nil) sub, err := wakuNode.Subscribe(ctx, nil)
if err != nil { if err != nil {
log.Error("Could not subscribe: ", err) log.Error("Could not subscribe: ", err)
return return

View File

@ -23,7 +23,6 @@ type Chat struct {
// Messages is a channel of messages received from other peers in the chat room // Messages is a channel of messages received from other peers in the chat room
Messages chan *pb.Chat2Message Messages chan *pb.Chat2Message
sub *node.Subscription
C chan *protocol.Envelope C chan *protocol.Envelope
node *node.WakuNode node *node.WakuNode
@ -49,20 +48,12 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic
if useLightPush { if useLightPush {
chat.C = make(filter.ContentFilterChan) chat.C = make(filter.ContentFilterChan)
n.SubscribeFilter(ctx, string(relay.GetTopic(nil)), []string{contentTopic}, chat.C)
filterRequest := wpb.FilterRequest{
ContentFilters: []*wpb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
Topic: string(relay.GetTopic(nil)),
Subscribe: true,
}
n.SubscribeFilter(ctx, filterRequest, chat.C)
} else { } else {
sub, err := n.Subscribe(nil) sub, err := n.Subscribe(ctx, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chat.C = sub.C chat.C = sub.C
} }

View File

@ -84,12 +84,6 @@ func main() {
// //
// Send FilterRequest from light node to full node // Send FilterRequest from light node to full node
filterRequest := pb.FilterRequest{
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
Topic: string(pubSubTopic),
Subscribe: true,
}
filterChan := make(filter.ContentFilterChan) filterChan := make(filter.ContentFilterChan)
go func() { go func() {
@ -97,16 +91,15 @@ func main() {
log.Info("Light node received msg, ", string(env.Message().Payload)) log.Info("Light node received msg, ", string(env.Message().Payload))
} }
}() }()
lightNode.SubscribeFilter(ctx, filterRequest, filterChan) lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}, filterChan)
go writeLoop(ctx, fullNode) go writeLoop(ctx, fullNode)
go readLoop(fullNode) go readLoop(ctx, fullNode)
go func() { go func() {
// Unsubscribe filter after 5 seconds // Unsubscribe filter after 5 seconds
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
filterRequest.Subscribe = false lightNode.UnsubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic})
lightNode.UnsubscribeFilter(ctx, filterRequest)
}() }()
// Wait for a SIGINT or SIGTERM signal // Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
@ -117,7 +110,6 @@ func main() {
// shut the nodes down // shut the nodes down
fullNode.Stop() fullNode.Stop()
lightNode.Stop() lightNode.Stop()
} }
func randomHex(n int) (string, error) { func randomHex(n int) (string, error) {
@ -136,7 +128,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
p.Data = []byte(wakuNode.ID() + ": " + msgContent) p.Data = []byte(wakuNode.ID() + ": " + msgContent)
p.Key = &node.KeyInfo{Kind: node.None} p.Key = &node.KeyInfo{Kind: node.None}
payload, err := p.Encode(version) payload, _ := p.Encode(version)
msg := &pb.WakuMessage{ msg := &pb.WakuMessage{
Payload: payload, Payload: payload,
@ -145,7 +137,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
Timestamp: timestamp, Timestamp: timestamp,
} }
_, err = wakuNode.Publish(ctx, msg, nil) _, err := wakuNode.Publish(ctx, msg, nil)
if err != nil { if err != nil {
log.Error("Error sending a message: ", err) log.Error("Error sending a message: ", err)
} }
@ -158,8 +150,8 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
} }
} }
func readLoop(wakuNode *node.WakuNode) { func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
sub, err := wakuNode.Subscribe(&pubSubTopic) sub, err := wakuNode.Subscribe(ctx, &pubSubTopic)
if err != nil { if err != nil {
log.Error("Could not subscribe: ", err) log.Error("Could not subscribe: ", err)
return return

2
examples/peer_events/build/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*
!.gitignore

View File

@ -211,8 +211,8 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
} }
} }
func readLoop(wakuNode *node.WakuNode) { func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
sub, err := wakuNode.Subscribe(&pubSubTopic) sub, err := wakuNode.Subscribe(ctx, &pubSubTopic)
if err != nil { if err != nil {
log.Error("Could not subscribe: ", err) log.Error("Could not subscribe: ", err)
return return

View File

@ -170,7 +170,7 @@ func Execute(options Options) {
for _, t := range options.Relay.Topics { for _, t := range options.Relay.Topics {
nodeTopic := relay.Topic(t) nodeTopic := relay.Topic(t)
_, err := wakuNode.Subscribe(&nodeTopic) _, err := wakuNode.Subscribe(ctx, &nodeTopic)
failOnErr(err, "Error subscring to topic") failOnErr(err, "Error subscring to topic")
} }

View File

@ -17,7 +17,7 @@ type RelayOptions struct {
type FilterOptions struct { type FilterOptions struct {
Enable bool `long:"filter" description:"Enable filter protocol"` Enable bool `long:"filter" description:"Enable filter protocol"`
Nodes []string `long:"filter-node" description:"Multiaddr of a peer to request content filtering of messages. Option may be repeated"` Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"`
} }
// LightpushOptions are settings used to enable the lightpush protocol. This is // LightpushOptions are settings used to enable the lightpush protocol. This is
@ -28,7 +28,7 @@ type FilterOptions struct {
// broadcasted // broadcasted
type LightpushOptions struct { type LightpushOptions struct {
Enable bool `long:"lightpush" description:"Enable lightpush protocol"` Enable bool `long:"lightpush" description:"Enable lightpush protocol"`
Nodes []string `long:"lightpush-node" description:"Multiaddr of a peer to request lightpush of published messages. Option may be repeated"` Nodes []string `long:"lightpush-node" description:"Multiaddr of a peer that supports lightpush protocol. Option may be repeated"`
} }
// StoreOptions are settings used for enabling the store protocol, used to // StoreOptions are settings used for enabling the store protocol, used to
@ -37,7 +37,7 @@ type LightpushOptions struct {
type StoreOptions struct { type StoreOptions struct {
Enable bool `long:"store" description:"Enable store protocol"` Enable bool `long:"store" description:"Enable store protocol"`
ShouldResume bool `long:"resume" description:"fix the gap in message history"` ShouldResume bool `long:"resume" description:"fix the gap in message history"`
Nodes []string `long:"store-node" description:"Multiaddr of a peer to request stored messages. Option may be repeated"` Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"`
} }
// DNSDiscoveryOptions are settings used for enabling DNS-based discovery // DNSDiscoveryOptions are settings used for enabling DNS-based discovery

View File

@ -225,7 +225,7 @@ func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...pubsub.Option) e
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...)
if shouldRelayMessages { if shouldRelayMessages {
_, err := w.Subscribe(nil) _, err := w.Subscribe(w.ctx, nil)
if err != nil { if err != nil {
return err return err
} }
@ -350,7 +350,7 @@ func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error {
return nil return nil
} }
func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) { func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subscription, error) {
// Subscribes to a PubSub topic. // Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage. // NOTE The data field SHOULD be decoded as a WakuMessage.
if node.relay == nil { if node.relay == nil {
@ -407,7 +407,7 @@ func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) {
close(subscription.C) close(subscription.C)
subscription.mutex.Unlock() subscription.mutex.Unlock()
case <-nextMsgTicker.C: case <-nextMsgTicker.C:
msg, err := sub.Next(node.ctx) msg, err := sub.Next(ctx)
if err != nil { if err != nil {
subscription.mutex.Lock() subscription.mutex.Lock()
for _, subscription := range node.subscriptions[t] { for _, subscription := range node.subscriptions[t] {
@ -437,57 +437,50 @@ func (node *WakuNode) Subscribe(topic *relay.Topic) (*Subscription, error) {
// Wrapper around WakuFilter.Subscribe // Wrapper around WakuFilter.Subscribe
// that adds a Filter object to node.filters // that adds a Filter object to node.filters
func (node *WakuNode) SubscribeFilter(ctx context.Context, request pb.FilterRequest, ch filter.ContentFilterChan) error { // TODO: what's up with this channel?.......................... is it closed eventually?
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received. func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string, ch filter.ContentFilterChan) error {
// ContentFilterChan takes MessagePush structs
// Status: Implemented.
// Sanity check for well-formed subscribe FilterRequest
//doAssert(request.subscribe, "invalid subscribe request")
log.Info("SubscribeFilter, request: ", request)
var id string
var err error
if node.filter == nil { if node.filter == nil {
return errors.New("WakuFilter is not set") return errors.New("WakuFilter is not set")
} }
id, err = node.filter.Subscribe(ctx, request) // Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs
id, peerID, err := node.filter.Subscribe(ctx, topic, contentTopics)
if id == "" || err != nil { if id == "" || err != nil {
// Failed to subscribe // Failed to subscribe
log.Error("remote subscription to filter failed", request) log.Error("remote subscription to filter failed")
//waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
return err return err
} }
// Register handler for filter, whether remote subscription succeeded or not // Register handler for filter, whether remote subscription succeeded or not
node.filters[id] = filter.Filter{ node.filters[id] = filter.Filter{
Topic: request.Topic, PeerID: *peerID,
ContentFilters: request.ContentFilters, Topic: topic,
ContentFilters: contentTopics,
Chan: ch, Chan: ch,
} }
return nil return nil
} }
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, request pb.FilterRequest) { func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) {
log.Info("UnsubscribeFilter, request: ", request)
// Send message to full node in order to unsubscribe
node.filter.Unsubscribe(ctx, request)
// Remove local filter // Remove local filter
var idsToRemove []string var idsToRemove []string
for id, f := range node.filters { for id, f := range node.filters {
if f.Topic != topic {
continue
}
// Send message to full node in order to unsubscribe
node.filter.Unsubscribe(ctx, topic, contentTopics, f.PeerID)
// Iterate filter entries to remove matching content topics // Iterate filter entries to remove matching content topics
// make sure we delete the content filter // make sure we delete the content filter
// if no more topics are left // if no more topics are left
for _, cfToDelete := range request.ContentFilters { for _, cfToDelete := range contentTopics {
for i, cf := range f.ContentFilters { for i, cf := range f.ContentFilters {
if cf.ContentTopic == cfToDelete.ContentTopic { if cf == cfToDelete {
l := len(f.ContentFilters) - 1 l := len(f.ContentFilters) - 1
f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l] f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l]
f.ContentFilters = f.ContentFilters[:l] f.ContentFilters = f.ContentFilters[:l]

View File

@ -25,10 +25,12 @@ type (
ContentFilterChan chan *protocol.Envelope ContentFilterChan chan *protocol.Envelope
Filter struct { Filter struct {
PeerID peer.ID
Topic string Topic string
ContentFilters []*pb.FilterRequest_ContentFilter ContentFilters []string
Chan ContentFilterChan Chan ContentFilterChan
} }
// @TODO MAYBE MORE INFO? // @TODO MAYBE MORE INFO?
Filters map[string]Filter Filters map[string]Filter
@ -69,8 +71,8 @@ func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) {
// TODO: In case of no topics we should either trigger here for all messages, // TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place. // or we should not allow such filter to exist in the first place.
for _, contentFilter := range filter.ContentFilters { for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentFilter.ContentTopic { if msg.ContentTopic == contentTopic {
filter.Chan <- envelope filter.Chan <- envelope
break break
} }
@ -232,7 +234,18 @@ func (wf *WakuFilter) FilterListener() {
// Having a FilterRequest struct, // Having a FilterRequest struct,
// select a peer with filter support, dial it, // select a peer with filter support, dial it,
// and submit FilterRequest wrapped in FilterRPC // and submit FilterRequest wrapped in FilterRPC
func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (string, error) { //.async, gcsafe.} { func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (string, *peer.ID, error) {
var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range contentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
request := pb.FilterRequest{
Subscribe: true,
Topic: topic,
ContentFilters: contentFilters,
}
peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1)) peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1))
if err == nil { if err == nil {
conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1) conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1)
@ -249,34 +262,38 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, request pb.FilterRequest) (
err = writer.WriteMsg(filterRPC) err = writer.WriteMsg(filterRPC)
if err != nil { if err != nil {
log.Error("failed to write message", err) log.Error("failed to write message", err)
return "", err return "", nil, err
} }
return string(id), nil return string(id), peer, nil
} else { } else {
// @TODO more sophisticated error handling here
log.Error("failed to connect to remote peer") log.Error("failed to connect to remote peer")
//waku_filter_errors.inc(labelValues = [dialFailure]) return "", nil, err
return "", err
} }
} else { }
log.Info("error selecting peer: ", err) log.Info("error selecting peer: ", err)
return "", nil, err
} }
return "", nil func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) {
} conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest) {
// @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
peer, err := utils.SelectPeer(wf.h, string(FilterID_v20beta1))
if err == nil {
conn, err := wf.h.NewStream(ctx, *peer, FilterID_v20beta1)
if conn != nil { if conn != nil {
defer conn.Close() defer conn.Close()
// This is the only successful path to subscription // This is the only successful path to subscription
id := protocol.GenerateRequestId() id := protocol.GenerateRequestId()
var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range contentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
request := pb.FilterRequest{
Subscribe: false,
Topic: topic,
ContentFilters: contentFilters,
}
writer := protoio.NewDelimitedWriter(conn) writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request} filterRPC := &pb.FilterRPC{RequestId: hex.EncodeToString(id), Request: &request}
err = writer.WriteMsg(filterRPC) err = writer.WriteMsg(filterRPC)
@ -284,11 +301,6 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, request pb.FilterRequest)
log.Error("failed to write message", err) log.Error("failed to write message", err)
} }
} else { } else {
// @TODO more sophisticated error handling here
log.Error("failed to connect to remote peer", err) log.Error("failed to connect to remote peer", err)
//waku_filter_errors.inc(labelValues = [dialFailure])
}
} else {
log.Info("Error selecting peer: ", err)
} }
} }