2024-05-16 19:12:10 -04:00

136 lines
2.9 KiB
Go

package main
import (
"database/sql"
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
"github.com/google/uuid"
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"google.golang.org/protobuf/proto"
)
type dbInfo struct {
id string
db *sql.DB
file string
stmt *sql.Stmt
inserted bool
}
type dbMap map[string]*dbInfo
var databases dbMap
func newDBInfo(file string) {
id := uuid.New().String()
db, err := sql.Open("sqlite3", file+"?_journal=WAL")
if err != nil {
panic(err.Error())
}
stmt, err := db.Prepare("INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) VALUES(?,?,?,?,?,?,?,?)")
if err != nil {
panic(err.Error())
}
databases[id] = &dbInfo{
id: id,
file: file,
db: db,
stmt: stmt,
}
}
func (d dbMap) values() []*dbInfo {
var result []*dbInfo
for _, x := range d {
result = append(result, x)
}
return result
}
func main() {
if len(os.Args) == 1 {
fmt.Println("Use: populatedb [path_to_db1 path_to_db2 ...]")
return
}
databases = make(dbMap)
for _, dbPath := range os.Args[1:] {
newDBInfo(dbPath)
}
pubsubTopics := []string{
"/waku/2/rs/1/1",
"/waku/2/rs/1/2",
"/waku/2/rs/1/3",
}
// Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
timeInterval := 5 * time.Second
t := time.NewTicker(timeInterval)
minuteTimer := time.NewTimer(0)
fmt.Print(" \t\t\t\t\t\t")
i := 0
for range databases {
i++
fmt.Printf("db_%d\t", i)
}
fmt.Println()
for {
select {
case currTime := <-minuteTimer.C:
fmt.Println(currTime)
minuteTimer.Reset(1 * time.Minute)
case <-t.C:
insertCnt := rand.Intn(len(databases)) + 1 // Insert in at least one store node
dbArr := databases.values()
rand.Shuffle(len(dbArr), func(i, j int) { dbArr[i], dbArr[j] = dbArr[j], dbArr[i] })
for _, x := range dbArr {
x.inserted = false
}
now := time.Now().UnixNano()
msg := &pb.WakuMessage{
Timestamp: proto.Int64(now),
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: "test",
}
envelope := protocol.NewEnvelope(msg, now, pubsubTopics[rand.Intn(len(pubsubTopics))])
hash := envelope.Hash()
for i := 0; i < insertCnt; i++ {
dbArr[i].inserted = true
_, err := dbArr[i].stmt.Exec([]byte(envelope.PubsubTopic()), []byte(msg.ContentTopic), msg.Payload, msg.GetVersion(), msg.GetTimestamp(), envelope.Index().Digest, hash.Bytes(), now)
if err != nil {
panic(err.Error())
}
}
fmt.Printf("%s\t%s\t%v\t", hash.String(), envelope.PubsubTopic(), msg.GetTimestamp())
for _, x := range databases {
fmt.Printf("%v\t", x.inserted)
}
fmt.Println()
case <-ch:
return
}
}
}