Source file
src/io/pipe.go
1
2
3
4
5
6
7
8 package io
9
10 import (
11 "errors"
12 "sync"
13 )
14
15
16 type onceError struct {
17 sync.Mutex
18 err error
19 }
20
21 func (a *onceError) Store(err error) {
22 a.Lock()
23 defer a.Unlock()
24 if a.err != nil {
25 return
26 }
27 a.err = err
28 }
29 func (a *onceError) Load() error {
30 a.Lock()
31 defer a.Unlock()
32 return a.err
33 }
34
35
36 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
37
38
39 type pipe struct {
40 wrMu sync.Mutex
41 wrCh chan []byte
42 rdCh chan int
43
44 once sync.Once
45 done chan struct{}
46 rerr onceError
47 werr onceError
48 }
49
50 func (p *pipe) read(b []byte) (n int, err error) {
51 select {
52 case <-p.done:
53 return 0, p.readCloseError()
54 default:
55 }
56
57 select {
58 case bw := <-p.wrCh:
59 nr := copy(b, bw)
60 p.rdCh <- nr
61 return nr, nil
62 case <-p.done:
63 return 0, p.readCloseError()
64 }
65 }
66
67 func (p *pipe) closeRead(err error) error {
68 if err == nil {
69 err = ErrClosedPipe
70 }
71 p.rerr.Store(err)
72 p.once.Do(func() { close(p.done) })
73 return nil
74 }
75
76 func (p *pipe) write(b []byte) (n int, err error) {
77 select {
78 case <-p.done:
79 return 0, p.writeCloseError()
80 default:
81 p.wrMu.Lock()
82 defer p.wrMu.Unlock()
83 }
84
85 for once := true; once || len(b) > 0; once = false {
86 select {
87 case p.wrCh <- b:
88 nw := <-p.rdCh
89 b = b[nw:]
90 n += nw
91 case <-p.done:
92 return n, p.writeCloseError()
93 }
94 }
95 return n, nil
96 }
97
98 func (p *pipe) closeWrite(err error) error {
99 if err == nil {
100 err = EOF
101 }
102 p.werr.Store(err)
103 p.once.Do(func() { close(p.done) })
104 return nil
105 }
106
107
108 func (p *pipe) readCloseError() error {
109 rerr := p.rerr.Load()
110 if werr := p.werr.Load(); rerr == nil && werr != nil {
111 return werr
112 }
113 return ErrClosedPipe
114 }
115
116
117 func (p *pipe) writeCloseError() error {
118 werr := p.werr.Load()
119 if rerr := p.rerr.Load(); werr == nil && rerr != nil {
120 return rerr
121 }
122 return ErrClosedPipe
123 }
124
125
126 type PipeReader struct {
127 p *pipe
128 }
129
130
131
132
133
134
135 func (r *PipeReader) Read(data []byte) (n int, err error) {
136 return r.p.read(data)
137 }
138
139
140
141 func (r *PipeReader) Close() error {
142 return r.CloseWithError(nil)
143 }
144
145
146
147
148
149
150 func (r *PipeReader) CloseWithError(err error) error {
151 return r.p.closeRead(err)
152 }
153
154
155 type PipeWriter struct {
156 p *pipe
157 }
158
159
160
161
162
163
164 func (w *PipeWriter) Write(data []byte) (n int, err error) {
165 return w.p.write(data)
166 }
167
168
169
170 func (w *PipeWriter) Close() error {
171 return w.CloseWithError(nil)
172 }
173
174
175
176
177
178
179
180 func (w *PipeWriter) CloseWithError(err error) error {
181 return w.p.closeWrite(err)
182 }
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199 func Pipe() (*PipeReader, *PipeWriter) {
200 p := &pipe{
201 wrCh: make(chan []byte),
202 rdCh: make(chan int),
203 done: make(chan struct{}),
204 }
205 return &PipeReader{p}, &PipeWriter{p}
206 }
207
View as plain text