2021-04-28 20:10:44 +00:00
|
|
|
package relay
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-07-29 22:08:53 +00:00
|
|
|
"crypto/sha256"
|
2021-04-28 20:10:44 +00:00
|
|
|
"errors"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
proto "github.com/golang/protobuf/proto"
|
|
|
|
logging "github.com/ipfs/go-log"
|
|
|
|
"github.com/libp2p/go-libp2p-core/host"
|
2021-07-29 22:08:53 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
2021-04-28 20:10:44 +00:00
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
2021-04-28 20:10:44 +00:00
|
|
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
2021-07-29 22:08:53 +00:00
|
|
|
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
2021-04-28 20:10:44 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var log = logging.Logger("wakurelay")
|
|
|
|
|
|
|
|
type Topic string
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
|
2021-04-28 20:10:44 +00:00
|
|
|
const DefaultWakuTopic Topic = "/waku/2/default-waku/proto"
|
|
|
|
|
|
|
|
type WakuRelay struct {
|
|
|
|
host host.Host
|
2021-07-29 22:08:53 +00:00
|
|
|
pubsub *pubsub.PubSub
|
2021-04-28 20:10:44 +00:00
|
|
|
|
|
|
|
topics map[Topic]bool
|
|
|
|
topicsMutex sync.Mutex
|
2021-07-29 22:08:53 +00:00
|
|
|
wakuRelayTopics map[Topic]*pubsub.Topic
|
|
|
|
relaySubs map[Topic]*pubsub.Subscription
|
|
|
|
}
|
|
|
|
|
|
|
|
// Once https://github.com/status-im/nim-waku/issues/420 is fixed, implement a custom messageIdFn
|
|
|
|
func msgIdFn(pmsg *pubsub_pb.Message) string {
|
|
|
|
hash := sha256.Sum256(pmsg.Data)
|
|
|
|
return string(hash[:])
|
2021-04-28 20:10:44 +00:00
|
|
|
}
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
func NewWakuRelay(ctx context.Context, h host.Host, opts ...pubsub.Option) (*WakuRelay, error) {
|
2021-04-28 20:10:44 +00:00
|
|
|
w := new(WakuRelay)
|
|
|
|
w.host = h
|
|
|
|
w.topics = make(map[Topic]bool)
|
2021-07-29 22:08:53 +00:00
|
|
|
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
|
|
|
|
w.relaySubs = make(map[Topic]*pubsub.Subscription)
|
|
|
|
|
|
|
|
// default options required by WakuRelay
|
|
|
|
opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
|
|
|
|
opts = append(opts, pubsub.WithNoAuthor())
|
|
|
|
opts = append(opts, pubsub.WithMessageIdFn(msgIdFn))
|
|
|
|
|
|
|
|
opts = append(opts, pubsub.WithGossipSubProtocols(
|
|
|
|
[]protocol.ID{pubsub.GossipSubID_v11, pubsub.GossipSubID_v10, pubsub.FloodSubID, WakuRelayID_v200},
|
2021-10-08 13:50:56 +00:00
|
|
|
func(feat pubsub.GossipSubFeature, proto protocol.ID) bool {
|
2021-07-29 22:08:53 +00:00
|
|
|
switch feat {
|
2021-10-08 13:50:56 +00:00
|
|
|
case pubsub.GossipSubFeatureMesh:
|
|
|
|
return proto == pubsub.GossipSubID_v11 || proto == pubsub.GossipSubID_v10
|
|
|
|
case pubsub.GossipSubFeaturePX:
|
|
|
|
return proto == pubsub.GossipSubID_v11
|
2021-07-29 22:08:53 +00:00
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
2021-10-08 13:50:56 +00:00
|
|
|
},
|
2021-07-29 22:08:53 +00:00
|
|
|
))
|
2021-04-28 20:10:44 +00:00
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
ps, err := pubsub.NewGossipSub(ctx, h, opts...)
|
2021-04-28 20:10:44 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
w.pubsub = ps
|
|
|
|
|
|
|
|
log.Info("Relay protocol started")
|
|
|
|
|
|
|
|
return w, nil
|
|
|
|
}
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
func (w *WakuRelay) PubSub() *pubsub.PubSub {
|
2021-04-28 20:10:44 +00:00
|
|
|
return w.pubsub
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WakuRelay) Topics() []Topic {
|
|
|
|
defer w.topicsMutex.Unlock()
|
|
|
|
w.topicsMutex.Lock()
|
|
|
|
|
|
|
|
var result []Topic
|
2021-07-29 12:40:54 +00:00
|
|
|
for topic := range w.topics {
|
2021-04-28 20:10:44 +00:00
|
|
|
result = append(result, topic)
|
|
|
|
}
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
|
2021-04-28 20:10:44 +00:00
|
|
|
w.pubsub = pubSub
|
|
|
|
}
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
|
2021-04-28 20:10:44 +00:00
|
|
|
defer w.topicsMutex.Unlock()
|
|
|
|
w.topicsMutex.Lock()
|
|
|
|
|
|
|
|
w.topics[topic] = true
|
|
|
|
pubSubTopic, ok := w.wakuRelayTopics[topic]
|
|
|
|
if !ok { // Joins topic if node hasn't joined yet
|
|
|
|
newTopic, err := w.pubsub.Join(string(topic))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
w.wakuRelayTopics[topic] = newTopic
|
|
|
|
pubSubTopic = newTopic
|
|
|
|
}
|
|
|
|
return pubSubTopic, nil
|
|
|
|
}
|
|
|
|
|
2021-07-29 22:08:53 +00:00
|
|
|
func (w *WakuRelay) Subscribe(topic Topic) (subs *pubsub.Subscription, isNew bool, err error) {
|
2021-04-28 20:10:44 +00:00
|
|
|
sub, ok := w.relaySubs[topic]
|
|
|
|
if !ok {
|
|
|
|
pubSubTopic, err := w.upsertTopic(topic)
|
|
|
|
if err != nil {
|
|
|
|
return nil, false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
sub, err = pubSubTopic.Subscribe()
|
|
|
|
if err != nil {
|
|
|
|
return nil, false, err
|
|
|
|
}
|
|
|
|
w.relaySubs[topic] = sub
|
|
|
|
|
|
|
|
log.Info("Subscribing to topic ", topic)
|
|
|
|
}
|
|
|
|
|
|
|
|
isNew = !ok // ok will be true if subscription already exists
|
|
|
|
return sub, isNew, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) {
|
|
|
|
// Publish a `WakuMessage` to a PubSub topic.
|
|
|
|
if w.pubsub == nil {
|
2021-07-29 12:40:54 +00:00
|
|
|
return nil, errors.New("PubSub hasn't been set")
|
2021-04-28 20:10:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if message == nil {
|
|
|
|
return nil, errors.New("message can't be null")
|
|
|
|
}
|
|
|
|
|
|
|
|
pubSubTopic, err := w.upsertTopic(GetTopic(topic))
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
out, err := proto.Marshal(message)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = pubSubTopic.Publish(ctx, out)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
hash := pb.Hash(out)
|
|
|
|
|
|
|
|
return hash, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetTopic(topic *Topic) Topic {
|
|
|
|
var t Topic = DefaultWakuTopic
|
|
|
|
if topic != nil {
|
|
|
|
t = *topic
|
|
|
|
}
|
|
|
|
return t
|
|
|
|
}
|
2021-10-11 22:45:54 +00:00
|
|
|
|
|
|
|
func (w *WakuRelay) Stop() {
|
|
|
|
w.host.RemoveStreamHandler(WakuRelayID_v200)
|
|
|
|
}
|