mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 21:03:07 +00:00
230 lines
4.8 KiB
Go
230 lines
4.8 KiB
Go
|
|
package pubsub
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
func TestNewRpcQueue(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
if q.maxSize != maxSize {
|
||
|
|
t.Fatalf("rpc queue has wrong max size, expected %d but got %d", maxSize, q.maxSize)
|
||
|
|
}
|
||
|
|
if q.dataAvailable.L != &q.queueMu {
|
||
|
|
t.Fatalf("the dataAvailable field of rpc queue has an incorrect mutex")
|
||
|
|
}
|
||
|
|
if q.spaceAvailable.L != &q.queueMu {
|
||
|
|
t.Fatalf("the spaceAvailable field of rpc queue has an incorrect mutex")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueueUrgentPush(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
|
||
|
|
rpc1 := &RPC{}
|
||
|
|
rpc2 := &RPC{}
|
||
|
|
rpc3 := &RPC{}
|
||
|
|
rpc4 := &RPC{}
|
||
|
|
q.Push(rpc1, true)
|
||
|
|
q.UrgentPush(rpc2, true)
|
||
|
|
q.Push(rpc3, true)
|
||
|
|
q.UrgentPush(rpc4, true)
|
||
|
|
pop1, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
pop2, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
pop3, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
pop4, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if pop1 != rpc2 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
if pop2 != rpc4 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
if pop3 != rpc1 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
if pop4 != rpc3 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueuePushThenPop(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
|
||
|
|
rpc1 := &RPC{}
|
||
|
|
rpc2 := &RPC{}
|
||
|
|
q.Push(rpc1, true)
|
||
|
|
q.Push(rpc2, true)
|
||
|
|
pop1, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
pop2, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if pop1 != rpc1 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
if pop2 != rpc2 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueuePopThenPush(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
|
||
|
|
rpc1 := &RPC{}
|
||
|
|
rpc2 := &RPC{}
|
||
|
|
go func() {
|
||
|
|
// Wait to make sure the main goroutine is blocked.
|
||
|
|
time.Sleep(1 * time.Millisecond)
|
||
|
|
q.Push(rpc1, true)
|
||
|
|
q.Push(rpc2, true)
|
||
|
|
}()
|
||
|
|
pop1, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
pop2, err := q.Pop(context.Background())
|
||
|
|
if err != nil {
|
||
|
|
t.Fatal(err)
|
||
|
|
}
|
||
|
|
if pop1 != rpc1 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
if pop2 != rpc2 {
|
||
|
|
t.Fatalf("get wrong item from rpc queue Pop")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueueBlockPushWhenFull(t *testing.T) {
|
||
|
|
maxSize := 1
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
|
||
|
|
finished := make(chan struct{})
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
go func() {
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
finished <- struct{}{}
|
||
|
|
}()
|
||
|
|
// Wait to make sure the goroutine is blocked.
|
||
|
|
time.Sleep(1 * time.Millisecond)
|
||
|
|
select {
|
||
|
|
case <-finished:
|
||
|
|
t.Fatalf("blocking rpc queue Push is not blocked when it is full")
|
||
|
|
default:
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueueNonblockPushWhenFull(t *testing.T) {
|
||
|
|
maxSize := 1
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
err := q.Push(&RPC{}, false)
|
||
|
|
if err != ErrQueueFull {
|
||
|
|
t.Fatalf("non-blocking rpc queue Push returns wrong error when it is full")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueuePushAfterClose(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
q.Close()
|
||
|
|
|
||
|
|
defer func() {
|
||
|
|
if r := recover(); r == nil {
|
||
|
|
t.Fatalf("rpc queue Push does not panick after closed")
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueuePopAfterClose(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
q.Close()
|
||
|
|
_, err := q.Pop(context.Background())
|
||
|
|
if err != ErrQueueClosed {
|
||
|
|
t.Fatalf("rpc queue Pop returns wrong error after closed")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueueCloseWhilePush(t *testing.T) {
|
||
|
|
maxSize := 1
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
|
||
|
|
defer func() {
|
||
|
|
if r := recover(); r == nil {
|
||
|
|
t.Fatalf("rpc queue Push does not panick when it's closed on the fly")
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
|
||
|
|
go func() {
|
||
|
|
// Wait to make sure the main goroutine is blocked.
|
||
|
|
time.Sleep(1 * time.Millisecond)
|
||
|
|
q.Close()
|
||
|
|
}()
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueueCloseWhilePop(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
go func() {
|
||
|
|
// Wait to make sure the main goroutine is blocked.
|
||
|
|
time.Sleep(1 * time.Millisecond)
|
||
|
|
q.Close()
|
||
|
|
}()
|
||
|
|
_, err := q.Pop(context.Background())
|
||
|
|
if err != ErrQueueClosed {
|
||
|
|
t.Fatalf("rpc queue Pop returns wrong error when it's closed on the fly")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueuePushWhenFullThenPop(t *testing.T) {
|
||
|
|
maxSize := 1
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
go func() {
|
||
|
|
// Wait to make sure the main goroutine is blocked.
|
||
|
|
time.Sleep(1 * time.Millisecond)
|
||
|
|
q.Pop(context.Background())
|
||
|
|
}()
|
||
|
|
q.Push(&RPC{}, true)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestRpcQueueCancelPop(t *testing.T) {
|
||
|
|
maxSize := 32
|
||
|
|
q := newRpcQueue(maxSize)
|
||
|
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
|
go func() {
|
||
|
|
// Wait to make sure the main goroutine is blocked.
|
||
|
|
time.Sleep(1 * time.Millisecond)
|
||
|
|
cancel()
|
||
|
|
}()
|
||
|
|
_, err := q.Pop(ctx)
|
||
|
|
if err != ErrQueueCancelled {
|
||
|
|
t.Fatalf("rpc queue Pop returns wrong error when it's cancelled")
|
||
|
|
}
|
||
|
|
}
|