From e298e4beb32e785b64d7c34ea37331042bbb3e4e Mon Sep 17 00:00:00 2001 From: Tevin Zhang Date: Tue, 12 Feb 2019 18:39:04 +0800 Subject: [PATCH] Add ResultPipes of different implementations --- result_pipes.go | 7 +++++++ result_pipes_mu.go | 35 +++++++++++++++++++++++++++++++++++ result_pipes_sync_map.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+) create mode 100644 result_pipes.go create mode 100644 result_pipes_mu.go create mode 100644 result_pipes_sync_map.go diff --git a/result_pipes.go b/result_pipes.go new file mode 100644 index 0000000..a5489b9 --- /dev/null +++ b/result_pipes.go @@ -0,0 +1,7 @@ +package tcp + +type resultPipes interface { + popResultPipe(int) (chan error, bool) + deregisterResultPipe(int) + registerResultPipe(int, chan error) +} diff --git a/result_pipes_mu.go b/result_pipes_mu.go new file mode 100644 index 0000000..a1cd1a8 --- /dev/null +++ b/result_pipes_mu.go @@ -0,0 +1,35 @@ +package tcp + +import "sync" + +type resultPipesMU struct { + l sync.Mutex + fdResultPipes map[int]chan error +} + +func newResultPipesMU() *resultPipesMU { + return &resultPipesMU{fdResultPipes: make(map[int]chan error)} +} + +func (r *resultPipesMU) popResultPipe(fd int) (chan error, bool) { + r.l.Lock() + p, exists := r.fdResultPipes[fd] + if exists { + delete(r.fdResultPipes, fd) + } + r.l.Unlock() + return p, exists +} + +func (r *resultPipesMU) deregisterResultPipe(fd int) { + r.l.Lock() + delete(r.fdResultPipes, fd) + r.l.Unlock() +} + +func (r *resultPipesMU) registerResultPipe(fd int, pipe chan error) { + // NOTE: the pipe should have been put back if c.fdResultPipes[fd] exists. + r.l.Lock() + r.fdResultPipes[fd] = pipe + r.l.Unlock() +} diff --git a/result_pipes_sync_map.go b/result_pipes_sync_map.go new file mode 100644 index 0000000..cbac102 --- /dev/null +++ b/result_pipes_sync_map.go @@ -0,0 +1,31 @@ +package tcp + +import "sync" + +type resultPipesSyncMap struct { + sync.Map +} + +func newResultPipesSyncMap() *resultPipesSyncMap { + return &resultPipesSyncMap{} +} + +func (r *resultPipesSyncMap) popResultPipe(fd int) (chan error, bool) { + p, exist := r.Load(fd) + if exist { + r.Delete(fd) + } + if p != nil { + return p.(chan error), exist + } + return nil, exist +} + +func (r *resultPipesSyncMap) deregisterResultPipe(fd int) { + r.Delete(fd) +} + +func (r *resultPipesSyncMap) registerResultPipe(fd int, pipe chan error) { + // NOTE: the pipe should have been put back if c.fdResultPipes[fd] exists. + r.Store(fd, pipe) +}