mirror of
https://github.com/status-im/status-go.git
synced 2025-01-24 13:41:24 +00:00
eeca435064
Update vendor Integrate rendezvous into status node Add a test with failover using rendezvous Use multiple servers in client Use discovery V5 by default and test that node can be started with rendezvous discovet Fix linter Update rendezvous client to one with instrumented stream Address feedback Fix test with updated topic limits Apply several suggestions Change log to debug for request errors because we continue execution Remove web3js after rebase Update rendezvous package
259 lines
5.1 KiB
Go
259 lines
5.1 KiB
Go
package log
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// MaxWriterBuffer specifies how big the writer buffer can get before
|
|
// killing the writer.
|
|
var MaxWriterBuffer = 512 * 1024
|
|
|
|
// MirrorWriter implements a WriteCloser which syncs incoming bytes to multiple
|
|
// [buffered] WriteClosers. They can be added with AddWriter().
|
|
type MirrorWriter struct {
|
|
active uint32
|
|
|
|
// channel for incoming writers
|
|
writerAdd chan *writerAdd
|
|
|
|
// slices of writer/sync-channel pairs
|
|
writers []*bufWriter
|
|
|
|
// synchronization channel for incoming writes
|
|
msgSync chan []byte
|
|
}
|
|
|
|
type writerSync struct {
|
|
w io.WriteCloser
|
|
br chan []byte
|
|
}
|
|
|
|
// NewMirrorWriter initializes and returns a MirrorWriter.
|
|
func NewMirrorWriter() *MirrorWriter {
|
|
mw := &MirrorWriter{
|
|
msgSync: make(chan []byte, 64), // sufficiently large buffer to avoid callers waiting
|
|
writerAdd: make(chan *writerAdd),
|
|
}
|
|
|
|
go mw.logRoutine()
|
|
|
|
return mw
|
|
}
|
|
|
|
// Write broadcasts the written bytes to all Writers.
|
|
func (mw *MirrorWriter) Write(b []byte) (int, error) {
|
|
mycopy := make([]byte, len(b))
|
|
copy(mycopy, b)
|
|
mw.msgSync <- mycopy
|
|
return len(b), nil
|
|
}
|
|
|
|
// Close closes the MirrorWriter
|
|
func (mw *MirrorWriter) Close() error {
|
|
// it is up to the caller to ensure that write is not called during or
|
|
// after close is called.
|
|
close(mw.msgSync)
|
|
return nil
|
|
}
|
|
|
|
func (mw *MirrorWriter) doClose() {
|
|
for _, w := range mw.writers {
|
|
w.writer.Close()
|
|
}
|
|
}
|
|
|
|
func (mw *MirrorWriter) logRoutine() {
|
|
// rebind to avoid races on nilling out struct fields
|
|
msgSync := mw.msgSync
|
|
writerAdd := mw.writerAdd
|
|
|
|
defer mw.doClose()
|
|
|
|
for {
|
|
select {
|
|
case b, ok := <-msgSync:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// write to all writers
|
|
dropped := mw.broadcastMessage(b)
|
|
|
|
// consolidate the slice
|
|
if dropped {
|
|
mw.clearDeadWriters()
|
|
}
|
|
case wa := <-writerAdd:
|
|
mw.writers = append(mw.writers, newBufWriter(wa.w))
|
|
|
|
atomic.StoreUint32(&mw.active, 1)
|
|
close(wa.done)
|
|
}
|
|
}
|
|
}
|
|
|
|
// broadcastMessage sends the given message to every writer
|
|
// if any writer is killed during the send, 'true' is returned
|
|
func (mw *MirrorWriter) broadcastMessage(b []byte) bool {
|
|
var dropped bool
|
|
for i, w := range mw.writers {
|
|
_, err := w.Write(b)
|
|
if err != nil {
|
|
mw.writers[i] = nil
|
|
dropped = true
|
|
}
|
|
}
|
|
return dropped
|
|
}
|
|
|
|
func (mw *MirrorWriter) clearDeadWriters() {
|
|
writers := mw.writers
|
|
mw.writers = nil
|
|
for _, w := range writers {
|
|
if w != nil {
|
|
mw.writers = append(mw.writers, w)
|
|
}
|
|
}
|
|
if len(mw.writers) == 0 {
|
|
atomic.StoreUint32(&mw.active, 0)
|
|
}
|
|
}
|
|
|
|
type writerAdd struct {
|
|
w io.WriteCloser
|
|
done chan struct{}
|
|
}
|
|
|
|
// AddWriter attaches a new WriteCloser to this MirrorWriter.
|
|
// The new writer will start getting any bytes written to the mirror.
|
|
func (mw *MirrorWriter) AddWriter(w io.WriteCloser) {
|
|
wa := &writerAdd{
|
|
w: w,
|
|
done: make(chan struct{}),
|
|
}
|
|
mw.writerAdd <- wa
|
|
<-wa.done
|
|
}
|
|
|
|
// Active returns if there is at least one Writer
|
|
// attached to this MirrorWriter
|
|
func (mw *MirrorWriter) Active() (active bool) {
|
|
return atomic.LoadUint32(&mw.active) == 1
|
|
}
|
|
|
|
func newBufWriter(w io.WriteCloser) *bufWriter {
|
|
bw := &bufWriter{
|
|
writer: w,
|
|
incoming: make(chan []byte, 1),
|
|
}
|
|
|
|
go bw.loop()
|
|
return bw
|
|
}
|
|
|
|
// writes incoming messages to a buffer and when it fills
|
|
// up, writes them to the writer
|
|
type bufWriter struct {
|
|
writer io.WriteCloser
|
|
|
|
incoming chan []byte
|
|
|
|
deathLock sync.Mutex
|
|
dead bool
|
|
}
|
|
|
|
var errDeadWriter = fmt.Errorf("writer is dead")
|
|
|
|
func (bw *bufWriter) Write(b []byte) (int, error) {
|
|
bw.deathLock.Lock()
|
|
dead := bw.dead
|
|
bw.deathLock.Unlock()
|
|
if dead {
|
|
if bw.incoming != nil {
|
|
close(bw.incoming)
|
|
bw.incoming = nil
|
|
}
|
|
return 0, errDeadWriter
|
|
}
|
|
|
|
bw.incoming <- b
|
|
return len(b), nil
|
|
}
|
|
|
|
func (bw *bufWriter) die() {
|
|
bw.deathLock.Lock()
|
|
bw.dead = true
|
|
bw.writer.Close()
|
|
bw.deathLock.Unlock()
|
|
}
|
|
|
|
func (bw *bufWriter) loop() {
|
|
bufsize := 0
|
|
bufBase := make([][]byte, 0, 16) // some initial memory
|
|
buffered := bufBase
|
|
nextCh := make(chan []byte)
|
|
|
|
var nextMsg []byte
|
|
|
|
go func() {
|
|
for b := range nextCh {
|
|
_, err := bw.writer.Write(b)
|
|
if err != nil {
|
|
// TODO: need a way to notify there was an error here
|
|
// wouldn't want to log here as it could casue an infinite loop
|
|
bw.die()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// collect and buffer messages
|
|
incoming := bw.incoming
|
|
for {
|
|
if nextMsg == nil || nextCh == nil {
|
|
// nextCh == nil implies we are 'dead' and draining the incoming channel
|
|
// until the caller notices and closes it for us
|
|
select {
|
|
case b, ok := <-incoming:
|
|
if !ok {
|
|
return
|
|
}
|
|
nextMsg = b
|
|
}
|
|
}
|
|
|
|
select {
|
|
case b, ok := <-incoming:
|
|
if !ok {
|
|
return
|
|
}
|
|
bufsize += len(b)
|
|
buffered = append(buffered, b)
|
|
if bufsize > MaxWriterBuffer {
|
|
// if we have too many messages buffered, kill the writer
|
|
bw.die()
|
|
if nextCh != nil {
|
|
close(nextCh)
|
|
}
|
|
nextCh = nil
|
|
// explicity keep going here to drain incoming
|
|
}
|
|
case nextCh <- nextMsg:
|
|
nextMsg = nil
|
|
if len(buffered) > 0 {
|
|
nextMsg = buffered[0]
|
|
buffered = buffered[1:]
|
|
bufsize -= len(nextMsg)
|
|
}
|
|
|
|
if len(buffered) == 0 {
|
|
// reset slice position
|
|
buffered = bufBase[:0]
|
|
}
|
|
}
|
|
}
|
|
}
|