fix: docs

This commit is contained in:
Richard Ramos 2021-10-09 14:18:53 -04:00
parent 4a7779dda1
commit 8f87009466
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
21 changed files with 96 additions and 59 deletions

View File

@ -1,6 +1,34 @@
# go-waku # go-waku
A Go implementation of the [Waku v2 protocol](https://specs.vac.dev/specs/waku/v2/waku-v2). A Go implementation of the [Waku v2 protocol](https://specs.vac.dev/specs/waku/v2/waku-v2).
<p align="left">
<a href="https://goreportcard.com/report/github.com/status-im/go-waku"><img src="https://goreportcard.com/badge/github.com/status-im/go-waku" /></a>
<a href="https://godoc.org/github.com/status-im/go-waku"><img src="http://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square" /></a>
<a href=""><img src="https://img.shields.io/badge/golang-%3E%3D1.13.0-orange.svg?style=flat-square" /></a>
<br>
</p>
## Install
```
git clone https://github.com/status-im/go-waku
cd go-waku
make
```
## Wakunode
See the available command line options with
```
./build/waku --help
```
## Library
```
go get github.com/status-im/go-waku
```
## Examples
Examples of usage of go-waku as a library can be found in the examples folder. There is a fully featured chat example.
## Waku Protocol Support ## Waku Protocol Support
@ -28,27 +56,6 @@ A Go implementation of the [Waku v2 protocol](https://specs.vac.dev/specs/waku/v
|[27/WAKU2-PEERS](https://rfc.vac.dev/spec/27)|✔| |[27/WAKU2-PEERS](https://rfc.vac.dev/spec/27)|✔|
|[29/WAKU2-CONFIG](https://rfc.vac.dev/spec/29)|🚧| |[29/WAKU2-CONFIG](https://rfc.vac.dev/spec/29)|🚧|
## Install
```
git clone https://github.com/status-im/go-waku
cd go-waku
make
```
## Wakunode
See the available command line options with
```
./build/waku --help
```
## Examples
Examples of usage of go-waku as a library can be found in the examples folder. There is a fully featured chat example.
## Contribution ## Contribution
Thank you for considering to help out with the source code! We welcome contributions from anyone on the internet, and are grateful for even the smallest of fixes! Thank you for considering to help out with the source code! We welcome contributions from anyone on the internet, and are grateful for even the smallest of fixes!
@ -59,6 +66,7 @@ To build and test this repository, you need:
- [protoc](https://grpc.io/docs/protoc-installation/) - [protoc](https://grpc.io/docs/protoc-installation/)
- [Protocol Buffers for Go with Gadgets](https://github.com/gogo/protobuf) - [Protocol Buffers for Go with Gadgets](https://github.com/gogo/protobuf)
## License ## License
Licensed and distributed under either of Licensed and distributed under either of

View File

@ -15,7 +15,6 @@ import (
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
) )
var log = logging.Logger("basic2") var log = logging.Logger("basic2")
@ -78,7 +77,7 @@ func randomHex(n int) (string, error) {
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
var contentTopic string = "test" var contentTopic string = "test"
var version uint32 = 0 var version uint32 = 0
var timestamp float64 = utils.GetUnixEpoch() var timestamp float64 = float64(time.Now().Unix())
p := new(node.Payload) p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent) p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -13,7 +13,6 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/filter"
wpb "github.com/status-im/go-waku/waku/v2/protocol/pb" wpb "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/utils"
"golang.org/x/crypto/pbkdf2" "golang.org/x/crypto/pbkdf2"
) )
@ -52,7 +51,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic
chat.C = make(filter.ContentFilterChan) chat.C = make(filter.ContentFilterChan)
filterRequest := wpb.FilterRequest{ filterRequest := wpb.FilterRequest{
ContentFilters: []*wpb.FilterRequest_ContentFilter{&wpb.FilterRequest_ContentFilter{ContentTopic: contentTopic}}, ContentFilters: []*wpb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
Topic: string(relay.GetTopic(nil)), Topic: string(relay.GetTopic(nil)),
Subscribe: true, Subscribe: true,
} }
@ -94,7 +93,7 @@ func (cr *Chat) Publish(ctx context.Context, message string) error {
} }
var version uint32 var version uint32
var timestamp float64 = utils.GetUnixEpoch() var timestamp float64 = float64(time.Now().Unix())
var keyInfo *node.KeyInfo = &node.KeyInfo{} var keyInfo *node.KeyInfo = &node.KeyInfo{}
if cr.useV1Payload { // Use WakuV1 encryption if cr.useV1Payload { // Use WakuV1 encryption

View File

@ -87,7 +87,7 @@ func NewChatUI(ctx context.Context, chat *Chat) *ChatUI {
if err != nil { if err != nil {
chatUI.displayMessage(err.Error()) chatUI.displayMessage(err.Error())
} else { } else {
chatUI.displayMessage("Peer connected succesfully") chatUI.displayMessage("Peer connected successfully")
} }
}(peer) }(peer)
return return

View File

@ -85,7 +85,7 @@ func main() {
// Send FilterRequest from light node to full node // Send FilterRequest from light node to full node
filterRequest := pb.FilterRequest{ filterRequest := pb.FilterRequest{
ContentFilters: []*pb.FilterRequest_ContentFilter{&pb.FilterRequest_ContentFilter{ContentTopic: contentTopic}}, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
Topic: string(pubSubTopic), Topic: string(pubSubTopic),
Subscribe: true, Subscribe: true,
} }

View File

@ -17,9 +17,7 @@ import (
"github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/relay"
//"github.com/status-im/go-waku/waku/v2/protocol/store" //"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/status-im/go-waku/waku/v2/utils"
) )
var log = logging.Logger("peer_events") var log = logging.Logger("peer_events")
@ -184,7 +182,7 @@ func randomHex(n int) (string, error) {
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
var version uint32 = 0 var version uint32 = 0
var timestamp float64 = utils.GetUnixEpoch() var timestamp float64 = float64(time.Now().Unix())
p := new(node.Payload) p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent) p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -6,13 +6,13 @@ import (
"encoding/hex" "encoding/hex"
"net" "net"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
) )
func TestBasicSendingReceiving(t *testing.T) { func TestBasicSendingReceiving(t *testing.T) {
@ -61,7 +61,7 @@ func randomHex(n int) (string, error) {
func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error { func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) error {
var contentTopic string = "test" var contentTopic string = "test"
var version uint32 = 0 var version uint32 = 0
var timestamp float64 = utils.GetUnixEpoch() var timestamp float64 = float64(time.Now().Unix())
p := new(node.Payload) p := new(node.Payload)
p.Data = []byte(wakuNode.ID() + ": " + msgContent) p.Data = []byte(wakuNode.ID() + ": " + msgContent)

View File

@ -20,6 +20,7 @@ type Server struct {
server *http.Server server *http.Server
} }
// NewMetricsServer creates a prometheus server on a particular interface and port
func NewMetricsServer(address string, port int) *Server { func NewMetricsServer(address string, port int) *Server {
_ = runmetrics.Enable(runmetrics.RunMetricOptions{ _ = runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true, EnableCPU: true,
@ -60,11 +61,12 @@ func NewMetricsServer(address string, port int) *Server {
return &p return &p
} }
// Listen starts the HTTP server in the background. // Start executes the HTTP server in the background.
func (p *Server) Start() { func (p *Server) Start() {
log.Info("Server stopped ", p.server.ListenAndServe()) log.Info("Server stopped ", p.server.ListenAndServe())
} }
// Stop shuts down the prometheus server
func (p *Server) Stop(ctx context.Context) { func (p *Server) Stop(ctx context.Context) {
err := p.server.Shutdown(ctx) err := p.server.Shutdown(ctx)
if err != nil { if err != nil {

View File

@ -52,6 +52,7 @@ func failOnErr(err error, msg string) {
} }
} }
// Execute starts a go-waku node with settings determined by the Options parameter
func Execute(options Options) { func Execute(options Options) {
if options.GenerateKey { if options.GenerateKey {
if err := writePrivateKeyToFile(options.KeyFile, options.Overwrite); err != nil { if err := writePrivateKeyToFile(options.KeyFile, options.Overwrite); err != nil {
@ -132,7 +133,7 @@ func Execute(options Options) {
} }
if options.Store.Enable { if options.Store.Enable {
nodeOpts = append(nodeOpts, node.WithWakuStore(true, true)) nodeOpts = append(nodeOpts, node.WithWakuStore(true, options.Store.ShouldResume))
if options.UseDB { if options.UseDB {
dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
failOnErr(err, "DBStore") failOnErr(err, "DBStore")

View File

@ -20,30 +20,46 @@ type FilterOptions struct {
Nodes []string `long:"filter-node" description:"Multiaddr of a peer to request content filtering of messages. Option may be repeated"` Nodes []string `long:"filter-node" description:"Multiaddr of a peer to request content filtering of messages. Option may be repeated"`
} }
// LightpushOptions are settings used to enable the lightpush protocol. This is
// a lightweight protocol used to avoid having to run the relay protocol which
// is more resource intensive. With this protocol a message is pushed to a peer
// that supports both the lightpush protocol and relay protocol. That peer will
// broadcast the message and return a confirmation that the message was
// broadcasted
type LightpushOptions struct { type LightpushOptions struct {
Enable bool `long:"lightpush" description:"Enable lightpush protocol"` Enable bool `long:"lightpush" description:"Enable lightpush protocol"`
Nodes []string `long:"lightpush-node" description:"Multiaddr of a peer to request lightpush of published messages. Option may be repeated"` Nodes []string `long:"lightpush-node" description:"Multiaddr of a peer to request lightpush of published messages. Option may be repeated"`
} }
// StoreOptions are settings used for enabling the store protocol, used to
// retrieve message history from other nodes as well as acting as a store
// node and provide message history to nodes that ask for it.
type StoreOptions struct { type StoreOptions struct {
Enable bool `long:"store" description:"Enable store protocol"` Enable bool `long:"store" description:"Enable store protocol"`
Nodes []string `long:"store-node" description:"Multiaddr of a peer to request stored messages. Option may be repeated"` ShouldResume bool `long:"resume" description:"fix the gap in message history"`
Nodes []string `long:"store-node" description:"Multiaddr of a peer to request stored messages. Option may be repeated"`
} }
// DNSDiscoveryOptions are settings used for enabling DNS-based discovery
// protocol that stores merkle trees in DNS records which contain connection
// information for nodes. It's very useful for bootstrapping a p2p network.
type DNSDiscoveryOptions struct { type DNSDiscoveryOptions struct {
Enable bool `long:"dns-discovery" description:"Enable DNS discovery"` Enable bool `long:"dns-discovery" description:"Enable DNS discovery"`
URL string `long:"dns-discovery-url" description:"URL for DNS node list in format 'enrtree://<key>@<fqdn>'"` URL string `long:"dns-discovery-url" description:"URL for DNS node list in format 'enrtree://<key>@<fqdn>'"`
Nameserver string `long:"dns-discovery-nameserver" description:"DNS nameserver IP to query (empty to use system's default)"` Nameserver string `long:"dns-discovery-nameserver" description:"DNS nameserver IP to query (empty to use system's default)"`
} }
// MetricsOptions are settings used to start a prometheus server for obtaining
// useful node metrics to monitor the health of behavior of the go-waku node.
type MetricsOptions struct { type MetricsOptions struct {
Enable bool `long:"metrics" description:"Enable the metrics server"` Enable bool `long:"metrics" description:"Enable the metrics server"`
Address string `long:"metrics-address" description:"Listening address of the metrics server" default:"127.0.0.1"` Address string `long:"metrics-address" description:"Listening address of the metrics server" default:"127.0.0.1"`
Port int `long:"metrics-port" description:"Listening HTTP port of the metrics server" default:"8008"` Port int `long:"metrics-port" description:"Listening HTTP port of the metrics server" default:"8008"`
} }
// Options contains all the available features and settings that can be
// configured via flags when executing go-waku as a service.
type Options struct { type Options struct {
// Example of optional value
Port int `short:"p" long:"port" description:"Libp2p TCP listening port (0 for random)" default:"9000"` Port int `short:"p" long:"port" description:"Libp2p TCP listening port (0 for random)" default:"9000"`
EnableWS bool `long:"ws" description:"Enable websockets support"` EnableWS bool `long:"ws" description:"Enable websockets support"`
WSPort int `long:"ws-port" description:"Libp2p TCP listening port for websocket connection (0 for random)" default:"9001"` WSPort int `long:"ws-port" description:"Libp2p TCP listening port for websocket connection (0 for random)" default:"9001"`

View File

@ -4,7 +4,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
"github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence"
) )

View File

@ -14,6 +14,7 @@ type DBStore struct {
db *sql.DB db *sql.DB
} }
// DBOption is an optional setting that can be used to configure the DBStore
type DBOption func(*DBStore) error type DBOption func(*DBStore) error
// WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore. // WithDB is a DBOption that lets you use any custom *sql.DB with a DBStore.

View File

@ -4,7 +4,8 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
) )
// Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120 which was released under MIT license // Adapted from https://github.com/dustin/go-broadcast/commit/f664265f5a662fb4d1df7f3533b1e8d0e0277120
// by Dustin Sallings (c) 2013, which was released under MIT license
type broadcaster struct { type broadcaster struct {
input chan *protocol.Envelope input chan *protocol.Envelope

View File

@ -6,9 +6,9 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
) )
// Subscription to a pubsub topic // Subscription handles the subscrition to a particular pubsub topic
type Subscription struct { type Subscription struct {
// Channel for receiving messages // C is channel used for receiving envelopes
C chan *protocol.Envelope C chan *protocol.Envelope
closed bool closed bool
@ -16,14 +16,14 @@ type Subscription struct {
quit chan struct{} quit chan struct{}
} }
// Unsubscribe from a pubsub topic. Will close the message channel // Unsubscribe will close a subscription from a pubsub topic. Will close the message channel
func (subs *Subscription) Unsubscribe() { func (subs *Subscription) Unsubscribe() {
if !subs.closed { if !subs.closed {
close(subs.quit) close(subs.quit)
} }
} }
// Determine whether a Subscription is open or not // IsClosed determine whether a Subscription is still open for receiving messages
func (subs *Subscription) IsClosed() bool { func (subs *Subscription) IsClosed() bool {
subs.mutex.Lock() subs.mutex.Lock()
defer subs.mutex.Unlock() defer subs.mutex.Unlock()

View File

@ -9,6 +9,9 @@ type Envelope struct {
hash []byte hash []byte
} }
// NewEnvelope creates a new Envelope that contains a WakuMessage
// It's used as a way to know to which Pubsub topic belongs a WakuMessage
// as well as generating a hash based on the bytes that compose the message
func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string) *Envelope { func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string) *Envelope {
data, _ := msg.Marshal() data, _ := msg.Marshal()
return &Envelope{ return &Envelope{
@ -19,18 +22,22 @@ func NewEnvelope(msg *pb.WakuMessage, pubSubTopic string) *Envelope {
} }
} }
// Message returns the WakuMessage associated to an Envelope
func (e *Envelope) Message() *pb.WakuMessage { func (e *Envelope) Message() *pb.WakuMessage {
return e.msg return e.msg
} }
// PubsubTopic returns the topic on which a WakuMessage was received
func (e *Envelope) PubsubTopic() string { func (e *Envelope) PubsubTopic() string {
return e.pubsubTopic return e.pubsubTopic
} }
// Hash returns a 32 byte hash calculated from the WakuMessage bytes
func (e *Envelope) Hash() []byte { func (e *Envelope) Hash() []byte {
return e.hash return e.hash
} }
// Size returns the byte size of the WakuMessage
func (e *Envelope) Size() int { func (e *Envelope) Size() int {
return e.size return e.size
} }

View File

@ -5,6 +5,7 @@ import (
proto "github.com/golang/protobuf/proto" proto "github.com/golang/protobuf/proto"
) )
// Hash calculates the hash of a waku message
func (msg *WakuMessage) Hash() ([]byte, error) { func (msg *WakuMessage) Hash() ([]byte, error) {
out, err := proto.Marshal(msg) out, err := proto.Marshal(msg)
if err != nil { if err != nil {
@ -14,6 +15,7 @@ func (msg *WakuMessage) Hash() ([]byte, error) {
return Hash(out), nil return Hash(out), nil
} }
// Hash calculates a hash from a byte slice using keccak256 for the hashing algorithm
func Hash(data []byte) []byte { func Hash(data []byte) []byte {
return gcrypto.Keccak256(data) return gcrypto.Keccak256(data)
} }

View File

@ -19,6 +19,8 @@ var brHmacDrbgPool = sync.Pool{New: func() interface{} {
return hmacdrbg.NewHmacDrbg(256, seed, nil) return hmacdrbg.NewHmacDrbg(256, seed, nil)
}} }}
// GenerateRequestId generates a random 32 byte slice that can be used for
// creating requests inf the filter, store and lightpush protocols
func GenerateRequestId() []byte { func GenerateRequestId() []byte {
rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg) rng := brHmacDrbgPool.Get().(*hmacdrbg.HmacDrbg)
defer brHmacDrbgPool.Put(rng) defer brHmacDrbgPool.Put(rng)

View File

@ -78,7 +78,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMess
initQuery = true // an empty cursor means it is an initial query initQuery = true // an empty cursor means it is an initial query
switch dir { switch dir {
case pb.PagingInfo_FORWARD: case pb.PagingInfo_FORWARD:
cursor = list[0].index // perform paging from the begining of the list cursor = list[0].index // perform paging from the beginning of the list
case pb.PagingInfo_BACKWARD: case pb.PagingInfo_BACKWARD:
cursor = list[len(list)-1].index // perform paging from the end of the list cursor = list[len(list)-1].index // perform paging from the end of the list
} }
@ -113,7 +113,7 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMess
retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages) retrievedPageSize = minOf(int(pageSize), MaxPageSize, remainingMessages)
s = foundIndex - retrievedPageSize s = foundIndex - retrievedPageSize
e = foundIndex - 1 e = foundIndex - 1
newCursor = msgList[s].index // the new cursor points to the begining of the page newCursor = msgList[s].index // the new cursor points to the beginning of the page
} }
// retrieve the messages // retrieve the messages
@ -360,7 +360,7 @@ func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) {
digest := sha256.Sum256(data) digest := sha256.Sum256(data)
return &pb.Index{ return &pb.Index{
Digest: digest[:], Digest: digest[:],
ReceiverTime: utils.GetUnixEpoch(), ReceiverTime: float64(time.Now().Unix()),
SenderTime: msg.Timestamp, SenderTime: msg.Timestamp,
}, nil }, nil
} }
@ -572,11 +572,11 @@ func (store *WakuStore) findLastSeen() float64 {
// if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window. // if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
// the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string // the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, error) { func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) {
currentTime := float64(time.Now().UnixNano()) currentTime := float64(time.Now().UnixNano())
lastSeenTime := store.findLastSeen() lastSeenTime := store.findLastSeen()
log.Info("resume ", int64(currentTime)) log.Info("resuming message history")
var offset float64 = 200000 var offset float64 = 200000
currentTime = currentTime + offset currentTime = currentTime + offset
@ -594,7 +594,7 @@ func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, err
var response *pb.HistoryResponse var response *pb.HistoryResponse
if len(peerList) > 0 { if len(peerList) > 0 {
var err error var err error
response, err = store.queryLoop(store.ctx, rpc, peerList) response, err = store.queryLoop(ctx, rpc, peerList)
if err != nil { if err != nil {
log.Error("failed to resume history", err) log.Error("failed to resume history", err)
return -1, ErrFailedToResumeHistory return -1, ErrFailedToResumeHistory
@ -607,13 +607,15 @@ func (store *WakuStore) Resume(pubsubTopic string, peerList []peer.ID) (int, err
return -1, ErrNoPeersAvailable return -1, ErrNoPeersAvailable
} }
response, err = store.queryFrom(store.ctx, rpc, *p, protocol.GenerateRequestId()) response, err = store.queryFrom(ctx, rpc, *p, protocol.GenerateRequestId())
if err != nil { if err != nil {
log.Error("failed to resume history", err) log.Error("failed to resume history", err)
return -1, ErrFailedToResumeHistory return -1, ErrFailedToResumeHistory
} }
} }
log.Info(fmt.Sprintf("obtained %d messages...", len(response.Messages)))
for _, msg := range response.Messages { for _, msg := range response.Messages {
store.storeMessage(pubsubTopic, msg) store.storeMessage(pubsubTopic, msg)
} }

View File

@ -2,12 +2,16 @@ package protocol
import "strings" import "strings"
// FulltextMatch is the default matching function used for checking if a peer
// supports a protocol or not
func FulltextMatch(expectedProtocol string) func(string) bool { func FulltextMatch(expectedProtocol string) func(string) bool {
return func(receivedProtocol string) bool { return func(receivedProtocol string) bool {
return receivedProtocol == expectedProtocol return receivedProtocol == expectedProtocol
} }
} }
// PrefixTextMatch is a matching function used for checking if a peer's
// supported protocols begin with a particular prefix
func PrefixTextMatch(prefix string) func(string) bool { func PrefixTextMatch(prefix string) func(string) bool {
return func(receivedProtocol string) bool { return func(receivedProtocol string) bool {
return strings.HasPrefix(receivedProtocol, prefix) return strings.HasPrefix(receivedProtocol, prefix)

View File

@ -10,6 +10,8 @@ import (
var log = logging.Logger("utils") var log = logging.Logger("utils")
// SelectPeer is used to return a peer that supports a given protocol.
// It currently selects the first peer returned by the peerstore
func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers. // Ideally depending on the query and our set of peers we take a subset of ideal peers.
@ -38,5 +40,5 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
return &peers[0], nil return &peers[0], nil
} }
return nil, errors.New("No suitable peers found") return nil, errors.New("no suitable peers found")
} }

View File

@ -1,7 +0,0 @@
package utils
import "time"
func GetUnixEpoch() float64 {
return float64(time.Now().UnixNano()) / float64(time.Second)
}