go-waku/waku/v2/protocol/relay/waku_relay.go
2023-05-04 13:25:45 -04:00

417 lines
11 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package relay
import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.uber.org/zap"
proto "google.golang.org/protobuf/proto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/hash"
"github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
type WakuRelay struct {
host host.Host
opts []pubsub.Option
pubsub *pubsub.PubSub
timesource timesource.Timesource
log *zap.Logger
bcaster v2.Broadcaster
minPeersToPublish int
// TODO: convert to concurrent maps
topicsMutex sync.Mutex
wakuRelayTopics map[string]*pubsub.Topic
relaySubs map[string]*pubsub.Subscription
// TODO: convert to concurrent maps
subscriptions map[string][]*Subscription
subscriptionsMutex sync.Mutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func msgIdFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}
// NewWakuRelay returns a new instance of a WakuRelay struct
func NewWakuRelay(bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
w := new(WakuRelay)
w.timesource = timesource
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
w.relaySubs = make(map[string]*pubsub.Subscription)
w.subscriptions = make(map[string][]*Subscription)
w.bcaster = bcaster
w.minPeersToPublish = minPeersToPublish
w.wg = sync.WaitGroup{}
w.log = log.Named("relay")
// default options required by WakuRelay
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
opts = append(opts, pubsub.WithNoAuthor())
opts = append(opts, pubsub.WithMessageIdFn(msgIdFn))
opts = append(opts, pubsub.WithGossipSubProtocols(
[]protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200},
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
switch feat {
case pubsub.GossipSubFeatureMesh:
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10
case pubsub.GossipSubFeaturePX:
return proto == pubsub.GossipSubID_v11
default:
return false
}
},
))
w.opts = opts
return w
}
// Sets the host to be able to mount or consume a protocol
func (w *WakuRelay) SetHost(h host.Host) {
w.host = h
}
func (w *WakuRelay) Start(ctx context.Context) error {
w.wg.Wait()
ctx, cancel := context.WithCancel(ctx)
w.ctx = ctx // TODO: create worker for creating subscriptions instead of storing context
w.cancel = cancel
ps, err := pubsub.NewGossipSub(ctx, w.host, w.opts...)
if err != nil {
return err
}
w.pubsub = ps
w.log.Info("Relay protocol started")
return nil
}
// PubSub returns the implementation of the pubsub system
func (w *WakuRelay) PubSub() *pubsub.PubSub {
return w.pubsub
}
// Topics returns a list of all the pubsub topics currently subscribed to
func (w *WakuRelay) Topics() []string {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
var result []string
for topic := range w.relaySubs {
result = append(result, topic)
}
return result
}
func (w *WakuRelay) IsSubscribed(topic string) bool {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
_, ok := w.relaySubs[topic]
return ok
}
// SetPubSub is used to set an implementation of the pubsub system
func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
w.pubsub = pubSub
}
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
defer w.topicsMutex.Unlock()
w.topicsMutex.Lock()
pubSubTopic, ok := w.wakuRelayTopics[topic]
if !ok { // Joins topic if node hasn't joined yet
newTopic, err := w.pubsub.Join(string(topic))
if err != nil {
return nil, err
}
w.wakuRelayTopics[topic] = newTopic
pubSubTopic = newTopic
}
return pubSubTopic, nil
}
/*
func (w *WakuRelay) validatorFactory(pubsubTopic string) func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
return func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
msg := new(pb.WakuMessage)
err := proto.Unmarshal(message.Data, msg)
return err == nil
}
}
*/
func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) {
sub, ok := w.relaySubs[topic]
if !ok {
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
return nil, err
}
/*
// TODO: Add a function to validate the WakuMessage integrity
// Rejects messages that are not WakuMessage
err = w.pubsub.RegisterTopicValidator(topic, w.validatorFactory(topic))
if err != nil {
return nil, err
}
*/
sub, err = pubSubTopic.Subscribe()
if err != nil {
return nil, err
}
w.relaySubs[topic] = sub
w.log.Info("subscribing to topic", zap.String("topic", sub.Topic()))
}
return sub, nil
}
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic
func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic.
if w.pubsub == nil {
return nil, errors.New("PubSub hasn't been set")
}
if message == nil {
return nil, errors.New("message can't be null")
}
if !w.EnoughPeersToPublishToTopic(topic) {
return nil, errors.New("not enough peers to publish")
}
pubSubTopic, err := w.upsertTopic(topic)
if err != nil {
return nil, err
}
out, err := proto.Marshal(message)
if err != nil {
return nil, err
}
err = pubSubTopic.Publish(ctx, out)
if err != nil {
return nil, err
}
hash := message.Hash(topic)
w.log.Debug("waku.relay published", zap.String("hash", hex.EncodeToString(hash)))
return hash, nil
}
// Publish is used to broadcast a WakuMessage to the default waku pubsub topic
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) {
return w.PublishToTopic(ctx, message, DefaultWakuTopic)
}
// Stop unmounts the relay protocol and stops all subscriptions
func (w *WakuRelay) Stop() {
if w.cancel == nil {
return // Not started
}
w.host.RemoveStreamHandler(WakuRelayID_v200)
w.cancel()
w.wg.Wait()
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
for _, topic := range w.Topics() {
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
}
w.subscriptions = nil
}
// EnoughPeersToPublish returns whether there are enough peers connected in the default waku pubsub topic
func (w *WakuRelay) EnoughPeersToPublish() bool {
return w.EnoughPeersToPublishToTopic(DefaultWakuTopic)
}
// EnoughPeersToPublish returns whether there are enough peers connected in a pubsub topic
func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
return len(w.PubSub().ListPeers(topic)) >= w.minPeersToPublish
}
// SubscribeToTopic returns a Subscription to receive messages from a pubsub topic
func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) {
sub, err := w.subscribe(topic)
if err != nil {
return nil, err
}
// Create client subscription
subscription := new(Subscription)
subscription.closed = false
subscription.C = make(chan *waku_proto.Envelope, 1024) // To avoid blocking
subscription.quit = make(chan struct{})
w.subscriptionsMutex.Lock()
defer w.subscriptionsMutex.Unlock()
w.subscriptions[topic] = append(w.subscriptions[topic], subscription)
if w.bcaster != nil {
w.bcaster.Register(&topic, subscription.C)
}
w.wg.Add(1)
go w.subscribeToTopic(ctx, topic, subscription, sub)
return subscription, nil
}
// SubscribeToTopic returns a Subscription to receive messages from the default waku pubsub topic
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
return w.SubscribeToTopic(ctx, DefaultWakuTopic)
}
// Unsubscribe closes a subscription to a pubsub topic
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
sub, ok := w.relaySubs[topic]
if !ok {
return fmt.Errorf("not subscribed to topic")
}
w.log.Info("unsubscribing from topic", zap.String("topic", sub.Topic()))
for _, sub := range w.subscriptions[topic] {
sub.Unsubscribe()
}
w.relaySubs[topic].Cancel()
delete(w.relaySubs, topic)
err := w.wakuRelayTopics[topic].Close()
if err != nil {
return err
}
delete(w.wakuRelayTopics, topic)
return nil
}
func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <-chan *pubsub.Message {
msgChannel := make(chan *pubsub.Message, 1024)
go func(msgChannel chan *pubsub.Message) {
defer func() {
if r := recover(); r != nil {
w.log.Debug("recovered msgChannel")
}
}()
for {
msg, err := sub.Next(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
w.log.Error("getting message from subscription", zap.Error(err))
}
sub.Cancel()
close(msgChannel)
for _, subscription := range w.subscriptions[sub.Topic()] {
subscription.Unsubscribe()
}
}
msgChannel <- msg
}
}(msgChannel)
return msgChannel
}
func (w *WakuRelay) subscribeToTopic(userCtx context.Context, pubsubTopic string, subscription *Subscription, sub *pubsub.Subscription) {
defer w.wg.Done()
ctx, err := tag.New(w.ctx, tag.Insert(metrics.KeyType, "relay"))
if err != nil {
w.log.Error("creating tag map", zap.Error(err))
return
}
subChannel := w.nextMessage(w.ctx, sub)
for {
select {
case <-userCtx.Done():
return
case <-ctx.Done():
return
case <-subscription.quit:
func(topic string) {
subscription.Lock()
defer subscription.Unlock()
if subscription.closed {
return
}
subscription.closed = true
if w.bcaster != nil {
<-w.bcaster.WaitUnregister(&topic, subscription.C) // Remove from broadcast list
}
close(subscription.C)
}(pubsubTopic)
// TODO: if there are no more relay subscriptions, close the pubsub subscription
case msg := <-subChannel:
if msg == nil {
return
}
wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil {
w.log.Error("decoding message", zap.Error(err))
return
}
msgSizeInKb := len(wakuMessage.Payload) / 1000
stats.Record(ctx, metrics.Messages.M(1), metrics.MessageSize.M(int64(msgSizeInKb)))
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic)
w.log.Debug("waku.relay received", logging.HexString("hash", envelope.Hash()))
if w.bcaster != nil {
w.bcaster.Submit(envelope)
}
}
}
}