diff --git a/README.md b/README.md index 7b249125..47c01d3d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,34 @@ # go-waku A Go implementation of the [Waku v2 protocol](https://specs.vac.dev/specs/waku/v2/waku-v2). +

+ + + +
+

+ +## Install +``` +git clone https://github.com/status-im/go-waku +cd go-waku +make +``` + +## Wakunode +See the available command line options with +``` +./build/waku --help +``` + +## Library +``` +go get github.com/status-im/go-waku +``` + +## Examples +Examples of usage of go-waku as a library can be found in the examples folder. There is a fully featured chat example. + ## Waku Protocol Support @@ -28,27 +56,6 @@ A Go implementation of the [Waku v2 protocol](https://specs.vac.dev/specs/waku/v |[27/WAKU2-PEERS](https://rfc.vac.dev/spec/27)|✔| |[29/WAKU2-CONFIG](https://rfc.vac.dev/spec/29)|🚧| - - - -## Install -``` -git clone https://github.com/status-im/go-waku -cd go-waku -make -``` - -## Wakunode -See the available command line options with -``` -./build/waku --help -``` - - -## Examples -Examples of usage of go-waku as a library can be found in the examples folder. There is a fully featured chat example. - - ## Contribution Thank you for considering to help out with the source code! We welcome contributions from anyone on the internet, and are grateful for even the smallest of fixes! @@ -59,6 +66,7 @@ To build and test this repository, you need: - [protoc](https://grpc.io/docs/protoc-installation/) - [Protocol Buffers for Go with Gadgets](https://github.com/gogo/protobuf) + ## License Licensed and distributed under either of diff --git a/examples/basic2/main.go b/examples/basic2/main.go index cfcf150b..c45ccfb0 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -15,7 +15,6 @@ import ( logging "github.com/ipfs/go-log" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "github.com/status-im/go-waku/waku/v2/utils" ) var log = logging.Logger("basic2") @@ -78,7 +77,7 @@ func randomHex(n int) (string, error) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { var contentTopic string = "test" var version uint32 = 0 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp float64 = float64(time.Now().Unix()) p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 8bcfdf3b..87577c02 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -13,7 +13,6 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/filter" wpb "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" - "github.com/status-im/go-waku/waku/v2/utils" "golang.org/x/crypto/pbkdf2" ) @@ -52,7 +51,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic chat.C = make(filter.ContentFilterChan) filterRequest := wpb.FilterRequest{ - ContentFilters: []*wpb.FilterRequest_ContentFilter{&wpb.FilterRequest_ContentFilter{ContentTopic: contentTopic}}, + ContentFilters: []*wpb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, Topic: string(relay.GetTopic(nil)), Subscribe: true, } @@ -94,7 +93,7 @@ func (cr *Chat) Publish(ctx context.Context, message string) error { } var version uint32 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp float64 = float64(time.Now().Unix()) var keyInfo *node.KeyInfo = &node.KeyInfo{} if cr.useV1Payload { // Use WakuV1 encryption diff --git a/examples/chat2/ui.go b/examples/chat2/ui.go index ca2e6604..ee9f18d2 100644 --- a/examples/chat2/ui.go +++ b/examples/chat2/ui.go @@ -87,7 +87,7 @@ func NewChatUI(ctx context.Context, chat *Chat) *ChatUI { if err != nil { chatUI.displayMessage(err.Error()) } else { - chatUI.displayMessage("Peer connected succesfully") + chatUI.displayMessage("Peer connected successfully") } }(peer) return diff --git a/examples/filter2/main.go b/examples/filter2/main.go index d774b882..2d1a54bd 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -85,7 +85,7 @@ func main() { // Send FilterRequest from light node to full node filterRequest := pb.FilterRequest{ - ContentFilters: []*pb.FilterRequest_ContentFilter{&pb.FilterRequest_ContentFilter{ContentTopic: contentTopic}}, + ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, Topic: string(pubSubTopic), Subscribe: true, } diff --git a/examples/peer_events/main.go b/examples/peer_events/main.go index 6e4c1336..acbf32e7 100644 --- a/examples/peer_events/main.go +++ b/examples/peer_events/main.go @@ -17,9 +17,7 @@ import ( "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" - //"github.com/status-im/go-waku/waku/v2/protocol/store" - "github.com/status-im/go-waku/waku/v2/utils" ) var log = logging.Logger("peer_events") @@ -184,7 +182,7 @@ func randomHex(n int) (string, error) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { var version uint32 = 0 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp float64 = float64(time.Now().Unix()) p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/tests/connection_test.go b/tests/connection_test.go index 9c1036a4..54531c20 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -6,13 +6,13 @@ import ( "encoding/hex" "net" "testing" + "time" "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/require" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "github.com/status-im/go-waku/waku/v2/utils" ) func TestBasicSendingReceiving(t *testing.T) { @@ -61,7 +61,7 @@ func randomHex(n int) (string, error) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error { var contentTopic string = "test" var version uint32 = 0 - var timestamp float64 = utils.GetUnixEpoch() + var timestamp float64 = float64(time.Now().Unix()) p := new(node.Payload) p.Data = []byte(wakuNode.ID() + ": " + msgContent) diff --git a/waku/metrics/http.go b/waku/metrics/http.go index 613bb23a..37b565ea 100644 --- a/waku/metrics/http.go +++ b/waku/metrics/http.go @@ -20,6 +20,7 @@ type Server struct { server *http.Server } +// NewMetricsServer creates a prometheus server on a particular interface and port func NewMetricsServer(address string, port int) *Server { _ = runmetrics.Enable(runmetrics.RunMetricOptions{ EnableCPU: true, @@ -60,11 +61,12 @@ func NewMetricsServer(address string, port int) *Server { return &p } -// Listen starts the HTTP server in the background. +// Start executes the HTTP server in the background. func (p *Server) Start() { log.Info("Server stopped ", p.server.ListenAndServe()) } +// Stop shuts down the prometheus server func (p *Server) Stop(ctx context.Context) { err := p.server.Shutdown(ctx) if err != nil { diff --git a/waku/node.go b/waku/node.go index 38787837..d1c7c750 100644 --- a/waku/node.go +++ b/waku/node.go @@ -52,6 +52,7 @@ func failOnErr(err error, msg string) { } } +// Execute starts a go-waku node with settings determined by the Options parameter func Execute(options Options) { if options.GenerateKey { if err := writePrivateKeyToFile(options.KeyFile, options.Overwrite); err != nil { @@ -132,7 +133,7 @@ func Execute(options Options) { } if options.Store.Enable { - nodeOpts = append(nodeOpts, node.WithWakuStore(true, true)) + nodeOpts = append(nodeOpts, node.WithWakuStore(true, options.Store.ShouldResume)) if options.UseDB { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) failOnErr(err, "DBStore") diff --git a/waku/options.go b/waku/options.go index e848f4f0..7709db93 100644 --- a/waku/options.go +++ b/waku/options.go @@ -20,30 +20,46 @@ type FilterOptions struct { Nodes []string `long:"filter-node" description:"Multiaddr of a peer to request content filtering of messages. Option may be repeated"` } +// LightpushOptions are settings used to enable the lightpush protocol. This is +// a lightweight protocol used to avoid having to run the relay protocol which +// is more resource intensive. With this protocol a message is pushed to a peer +// that supports both the lightpush protocol and relay protocol. That peer will +// broadcast the message and return a confirmation that the message was +// broadcasted type LightpushOptions struct { Enable bool `long:"lightpush" description:"Enable lightpush protocol"` Nodes []string `long:"lightpush-node" description:"Multiaddr of a peer to request lightpush of published messages. Option may be repeated"` } +// StoreOptions are settings used for enabling the store protocol, used to +// retrieve message history from other nodes as well as acting as a store +// node and provide message history to nodes that ask for it. type StoreOptions struct { - Enable bool `long:"store" description:"Enable store protocol"` - Nodes []string `long:"store-node" description:"Multiaddr of a peer to request stored messages. Option may be repeated"` + Enable bool `long:"store" description:"Enable store protocol"` + ShouldResume bool `long:"resume" description:"fix the gap in message history"` + Nodes []string `long:"store-node" description:"Multiaddr of a peer to request stored messages. Option may be repeated"` } +// DNSDiscoveryOptions are settings used for enabling DNS-based discovery +// protocol that stores merkle trees in DNS records which contain connection +// information for nodes. It's very useful for bootstrapping a p2p network. type DNSDiscoveryOptions struct { Enable bool `long:"dns-discovery" description:"Enable DNS discovery"` URL string `long:"dns-discovery-url" description:"URL for DNS node list in format 'enrtree://@'"` Nameserver string `long:"dns-discovery-nameserver" description:"DNS nameserver IP to query (empty to use system's default)"` } +// MetricsOptions are settings used to start a prometheus server for obtaining +// useful node metrics to monitor the health of behavior of the go-waku node. type MetricsOptions struct { Enable bool `long:"metrics" description:"Enable the metrics server"` Address string `long:"metrics-address" description:"Listening address of the metrics server" default:"127.0.0.1"` Port int `long:"metrics-port" description:"Listening HTTP port of the metrics server" default:"8008"` } +// Options contains all the available features and settings that can be +// configured via flags when executing go-waku as a service. type Options struct { - // Example of optional value Port int `short:"p" long:"port" description:"Libp2p TCP listening port (0 for random)" default:"9000"` EnableWS bool `long:"ws" description:"Enable websockets support"` WSPort int `long:"ws-port" description:"Libp2p TCP listening port for websocket connection (0 for random)" default:"9001"` diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index f2c3139f..638e448c 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -4,7 +4,7 @@ import ( "database/sql" "fmt" - _ "github.com/mattn/go-sqlite3" + _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver "github.com/status-im/go-waku/waku/persistence" ) diff --git a/waku/persistence/store.go b/waku/persistence/store.go index c8d907ff..77446e0c 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -14,6 +14,7 @@ type DBStore struct { db *sql.DB } +// DBOption is an optional setting that can be used to configure the DBStore type DBOption func(*DBStore) error // WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore. diff --git a/waku/v2/node/broadcast.go b/waku/v2/node/broadcast.go index 27ef1f70..a6cfac9c 100644 --- a/waku/v2/node/broadcast.go +++ b/waku/v2/node/broadcast.go @@ -4,7 +4,8 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol" ) -// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 which was released under MIT license +// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 +// by Dustin Sallings (c) 2013, which was released under MIT license type broadcaster struct { input chan *protocol.Envelope diff --git a/waku/v2/node/subscription.go b/waku/v2/node/subscription.go index 27adcf95..7f07968f 100644 --- a/waku/v2/node/subscription.go +++ b/waku/v2/node/subscription.go @@ -6,9 +6,9 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol" ) -// Subscription to a pubsub topic +// Subscription handles the subscrition to a particular pubsub topic type Subscription struct { - // Channel for receiving messages + // C is channel used for receiving envelopes C chan *protocol.Envelope closed bool @@ -16,14 +16,14 @@ type Subscription struct { quit chan struct{} } -// Unsubscribe from a pubsub topic. Will close the message channel +// Unsubscribe will close a subscription from a pubsub topic. Will close the message channel func (subs *Subscription) Unsubscribe() { if !subs.closed { close(subs.quit) } } -// Determine whether a Subscription is open or not +// IsClosed determine whether a Subscription is still open for receiving messages func (subs *Subscription) IsClosed() bool { subs.mutex.Lock() defer subs.mutex.Unlock() diff --git a/waku/v2/protocol/envelope.go b/waku/v2/protocol/envelope.go index 52fc51a1..93298fde 100644 --- a/waku/v2/protocol/envelope.go +++ b/waku/v2/protocol/envelope.go @@ -9,6 +9,9 @@ type Envelope struct { hash []byte } +// NewEnvelope creates a new Envelope that contains a WakuMessage +// It's used as a way to know to which Pubsub topic belongs a WakuMessage +// as well as generating a hash based on the bytes that compose the message func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string) *Envelope { data, _ := msg.Marshal() return &Envelope{ @@ -19,18 +22,22 @@ func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string) *Envelope { } } +// Message returns the WakuMessage associated to an Envelope func (e *Envelope) Message() *pb.WakuMessage { return e.msg } +// PubsubTopic returns the topic on which a WakuMessage was received func (e *Envelope) PubsubTopic() string { return e.pubsubTopic } +// Hash returns a 32 byte hash calculated from the WakuMessage bytes func (e *Envelope) Hash() []byte { return e.hash } +// Size returns the byte size of the WakuMessage func (e *Envelope) Size() int { return e.size } diff --git a/waku/v2/protocol/pb/utils.go b/waku/v2/protocol/pb/utils.go index 8652be32..e1a4afdc 100644 --- a/waku/v2/protocol/pb/utils.go +++ b/waku/v2/protocol/pb/utils.go @@ -5,6 +5,7 @@ import ( proto "github.com/golang/protobuf/proto" ) +// Hash calculates the hash of a waku message func (msg *WakuMessage) Hash() ([]byte, error) { out, err := proto.Marshal(msg) if err != nil { @@ -14,6 +15,7 @@ func (msg *WakuMessage) Hash() ([]byte, error) { return Hash(out), nil } +// Hash calculates a hash from a byte slice using keccak256 for the hashing algorithm func Hash(data []byte) []byte { return gcrypto.Keccak256(data) } diff --git a/waku/v2/protocol/requestId.go b/waku/v2/protocol/requestId.go index 102cb1c9..3a626af5 100644 --- a/waku/v2/protocol/requestId.go +++ b/waku/v2/protocol/requestId.go @@ -19,6 +19,8 @@ var brHmacDrbgPool = sync.Pool{New: func() interface{} { return hmacdrbg.NewHmacDrbg(256, seed, nil) }} +// GenerateRequestId generates a random 32 byte slice that can be used for +// creating requests inf the filter, store and lightpush protocols func GenerateRequestId() []byte { rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg) defer brHmacDrbgPool.Put(rng) diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index a17fa6a9..be184d29 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -78,7 +78,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMess initQuery = true // an empty cursor means it is an initial query switch dir { case pb.PagingInfo_FORWARD: - cursor = list[0].index // perform paging from the begining of the list + cursor = list[0].index // perform paging from the beginning of the list case pb.PagingInfo_BACKWARD: cursor = list[len(list)-1].index // perform paging from the end of the list } @@ -113,7 +113,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMess retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) s = foundIndex - retrievedPageSize e = foundIndex - 1 - newCursor = msgList[s].index // the new cursor points to the begining of the page + newCursor = msgList[s].index // the new cursor points to the beginning of the page } // retrieve the messages @@ -360,7 +360,7 @@ func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) { digest := sha256.Sum256(data) return &pb.Index{ Digest: digest[:], - ReceiverTime: utils.GetUnixEpoch(), + ReceiverTime: float64(time.Now().Unix()), SenderTime: msg.Timestamp, }, nil } @@ -572,11 +572,11 @@ func (store *WakuStore) findLastSeen() float64 { // if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window. // the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string -func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, error) { +func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) { currentTime := float64(time.Now().UnixNano()) lastSeenTime := store.findLastSeen() - log.Info("resume ", int64(currentTime)) + log.Info("resuming message history") var offset float64 = 200000 currentTime = currentTime + offset @@ -594,7 +594,7 @@ func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, err var response *pb.HistoryResponse if len(peerList) > 0 { var err error - response, err = store.queryLoop(store.ctx, rpc, peerList) + response, err = store.queryLoop(ctx, rpc, peerList) if err != nil { log.Error("failed to resume history", err) return -1, ErrFailedToResumeHistory @@ -607,13 +607,15 @@ func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, err return -1, ErrNoPeersAvailable } - response, err = store.queryFrom(store.ctx, rpc, *p, protocol.GenerateRequestId()) + response, err = store.queryFrom(ctx, rpc, *p, protocol.GenerateRequestId()) if err != nil { log.Error("failed to resume history", err) return -1, ErrFailedToResumeHistory } } + log.Info(fmt.Sprintf("obtained %d messages...", len(response.Messages))) + for _, msg := range response.Messages { store.storeMessage(pubsubTopic, msg) } diff --git a/waku/v2/protocol/utils.go b/waku/v2/protocol/utils.go index 8f538275..0bd561ba 100644 --- a/waku/v2/protocol/utils.go +++ b/waku/v2/protocol/utils.go @@ -2,12 +2,16 @@ package protocol import "strings" +// FulltextMatch is the default matching function used for checking if a peer +// supports a protocol or not func FulltextMatch(expectedProtocol string) func(string) bool { return func(receivedProtocol string) bool { return receivedProtocol == expectedProtocol } } +// PrefixTextMatch is a matching function used for checking if a peer's +// supported protocols begin with a particular prefix func PrefixTextMatch(prefix string) func(string) bool { return func(receivedProtocol string) bool { return strings.HasPrefix(receivedProtocol, prefix) diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 1553d03e..71081505 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -10,6 +10,8 @@ import ( var log = logging.Logger("utils") +// SelectPeer is used to return a peer that supports a given protocol. +// It currently selects the first peer returned by the peerstore func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. @@ -38,5 +40,5 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { return &peers[0], nil } - return nil, errors.New("No suitable peers found") + return nil, errors.New("no suitable peers found") } diff --git a/waku/v2/utils/time.go b/waku/v2/utils/time.go deleted file mode 100644 index 2d7b0aba..00000000 --- a/waku/v2/utils/time.go +++ /dev/null @@ -1,7 +0,0 @@ -package utils - -import "time" - -func GetUnixEpoch() float64 { - return float64(time.Now().UnixNano()) / float64(time.Second) -}