go-waku/dbutils/insert.go

151 lines
3.3 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"math/rand"
"time"
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
const secondsMonth = int64(30 * time.Hour * 24)
func genRandomBytes(size int) (blk []byte, err error) {
blk = make([]byte, size)
_, err = rand.Read(blk)
return
}
func genRandomTimestamp(t30daysAgo int64) int64 {
return rand.Int63n(secondsMonth) + t30daysAgo
}
func genRandomContentTopic(n int) string {
topics := []string{"topic1", "topic2", "topic3", "topic4", "topic5"}
i := n % 5
return protocol.NewContentTopic("test", 1, topics[i], "plaintext").String()
}
func newdb(path string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil, err
}
return db, nil
}
func createTable(db *sql.DB) error {
sqlStmt := `
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS message (
id BLOB,
receiverTimestamp INTEGER NOT NULL,
senderTimestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL DEFAULT 0,
CONSTRAINT messageIndex PRIMARY KEY (id, pubsubTopic)
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS message_senderTimestamp ON message(senderTimestamp);
CREATE INDEX IF NOT EXISTS message_receiverTimestamp ON message(receiverTimestamp);
`
_, err := db.Exec(sqlStmt)
if err != nil {
return err
}
return nil
}
func main() {
Ns := []int{10_000, 100_000, 1_000_000}
for _, N := range Ns {
dbName := fmt.Sprintf("store%d.db", N)
fmt.Println("Inserting ", N, " records in ", dbName)
db, err := newdb(dbName)
if err != nil {
panic(err)
}
defer db.Close()
query := "INSERT INTO message (id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?)"
err = createTable(db)
if err != nil {
panic(err)
}
trx, err := db.BeginTx(context.Background(), nil)
if err != nil {
panic(err)
}
stmt, err := trx.Prepare(query)
if err != nil {
panic(err)
}
t30daysAgo := time.Now().UnixNano() - secondsMonth
pubsubTopic := protocol.DefaultPubsubTopic().String()
for i := 1; i <= N; i++ {
if i%1000 == 0 && i > 1 && i < N {
err := trx.Commit()
if err != nil {
panic(err)
}
trx, err = db.BeginTx(context.Background(), nil)
if err != nil {
panic(err)
}
stmt, err = trx.Prepare(query)
if err != nil {
panic(err)
}
}
if i%(N/10) == 0 && i > 1 {
fmt.Println(i, "...")
}
randPayload, err := genRandomBytes(100)
if err != nil {
panic(err)
}
msg := pb.WakuMessage{
Version: 0,
ContentTopic: genRandomContentTopic(i),
Timestamp: genRandomTimestamp(t30daysAgo),
Payload: randPayload,
}
envelope := protocol.NewEnvelope(&msg, msg.Timestamp, pubsubTopic)
dbKey := persistence.NewDBKey(uint64(msg.Timestamp), uint64(time.Now().UnixNano()), pubsubTopic, envelope.Index().Digest)
_, err = stmt.Exec(dbKey.Bytes(), msg.Timestamp, msg.Timestamp, msg.ContentTopic, pubsubTopic, msg.Payload, msg.Version)
if err != nil {
panic(err)
}
}
err = trx.Commit()
if err != nil {
panic(err)
}
}
}