diff --git a/floodsub.go b/floodsub.go index 8bfa79c..ab41cee 100644 --- a/floodsub.go +++ b/floodsub.go @@ -333,7 +333,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) error { if p.validate(subs, &Message{pmsg}) { p.sendMsg <- sendReq{ from: rpc.from, - msg: &*Message{pmsg}, + msg: &Message{pmsg}, } } }() @@ -431,6 +431,8 @@ type addSubReq struct { resp chan *Subscription } +type SubOpt func(*Subscription) error + // WithValidator is an option that can be supplied to Subscribe. The argument is a function that returns whether or not a given message should be propagated further. func WithValidator(validate func(*Message) bool) func(*Subscription) error { return func(sub *Subscription) error { @@ -441,14 +443,14 @@ func WithValidator(validate func(*Message) bool) func(*Subscription) error { } // Subscribe returns a new Subscription for the given topic -func (p *PubSub) Subscribe(topic string, opts ...func(*Subscription) error) (*Subscription, error) { +func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) { td := pb.TopicDescriptor{Name: &topic} return p.SubscribeByTopicDescriptor(&td, opts...) } // SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor -func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...func(*Subscription) error) (*Subscription, error) { +func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) { if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { return nil, fmt.Errorf("auth mode not yet supported") }