Source file
src/os/pipe_test.go
1
2
3
4
5
6
7
8 package os_test
9
10 import (
11 "bufio"
12 "bytes"
13 "fmt"
14 "internal/testenv"
15 "io"
16 "io/fs"
17 "os"
18 osexec "os/exec"
19 "os/signal"
20 "runtime"
21 "strconv"
22 "strings"
23 "sync"
24 "syscall"
25 "testing"
26 "time"
27 )
28
29 func TestEPIPE(t *testing.T) {
30 r, w, err := os.Pipe()
31 if err != nil {
32 t.Fatal(err)
33 }
34 if err := r.Close(); err != nil {
35 t.Fatal(err)
36 }
37
38 expect := syscall.EPIPE
39 if runtime.GOOS == "windows" {
40
41 expect = syscall.Errno(232)
42 }
43
44 for i := 0; i < 20; i++ {
45 _, err = w.Write([]byte("hi"))
46 if err == nil {
47 t.Fatal("unexpected success of Write to broken pipe")
48 }
49 if pe, ok := err.(*fs.PathError); ok {
50 err = pe.Err
51 }
52 if se, ok := err.(*os.SyscallError); ok {
53 err = se.Err
54 }
55 if err != expect {
56 t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
57 }
58 }
59 }
60
61 func TestStdPipe(t *testing.T) {
62 switch runtime.GOOS {
63 case "windows":
64 t.Skip("Windows doesn't support SIGPIPE")
65 }
66 testenv.MustHaveExec(t)
67 r, w, err := os.Pipe()
68 if err != nil {
69 t.Fatal(err)
70 }
71 if err := r.Close(); err != nil {
72 t.Fatal(err)
73 }
74
75
76
77
78
79
80 for _, sig := range []bool{false, true} {
81 for dest := 1; dest < 4; dest++ {
82 cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
83 cmd.Stdout = w
84 cmd.Stderr = w
85 cmd.ExtraFiles = []*os.File{w}
86 cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
87 if sig {
88 cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
89 }
90 if err := cmd.Run(); err == nil {
91 if !sig && dest < 3 {
92 t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
93 }
94 } else if ee, ok := err.(*osexec.ExitError); !ok {
95 t.Errorf("unexpected exec error type %T: %v", err, err)
96 } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
97 t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
98 } else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
99 if sig || dest > 2 {
100 t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
101 }
102 } else {
103 t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
104 }
105 }
106 }
107
108
109 cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
110 cmd.Stdout = w
111 var stderr bytes.Buffer
112 cmd.Stderr = &stderr
113 cmd.Env = append(os.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
114 if err := cmd.Run(); err == nil {
115 t.Errorf("unexpected success of write to closed stdout")
116 } else if ee, ok := err.(*osexec.ExitError); !ok {
117 t.Errorf("unexpected exec error type %T: %v", err, err)
118 } else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
119 t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
120 } else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
121 t.Errorf("unexpected exit status %v for write to closed stdout", err)
122 }
123 if output := stderr.Bytes(); len(output) > 0 {
124 t.Errorf("unexpected output on stderr: %s", output)
125 }
126 }
127
128
129 func TestStdPipeHelper(t *testing.T) {
130 if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
131 signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
132 }
133 switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
134 case "1":
135 os.Stdout.Write([]byte("stdout"))
136 case "2":
137 os.Stderr.Write([]byte("stderr"))
138 case "3":
139 if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
140 os.Exit(3)
141 }
142 default:
143 t.Skip("skipping test helper")
144 }
145
146
147
148
149 os.Exit(0)
150 }
151
152 func testClosedPipeRace(t *testing.T, read bool) {
153 limit := 1
154 if !read {
155
156
157 limit = 131073
158 if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
159 if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
160 limit = i + 1
161 }
162 }
163 t.Logf("using pipe write limit of %d", limit)
164 }
165
166 r, w, err := os.Pipe()
167 if err != nil {
168 t.Fatal(err)
169 }
170 defer r.Close()
171 defer w.Close()
172
173
174
175 go func() {
176
177
178
179 time.Sleep(20 * time.Millisecond)
180
181 var err error
182 if read {
183 err = r.Close()
184 } else {
185 err = w.Close()
186 }
187 if err != nil {
188 t.Error(err)
189 }
190 }()
191
192 b := make([]byte, limit)
193 if read {
194 _, err = r.Read(b[:])
195 } else {
196 _, err = w.Write(b[:])
197 }
198 if err == nil {
199 t.Error("I/O on closed pipe unexpectedly succeeded")
200 } else if pe, ok := err.(*fs.PathError); !ok {
201 t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
202 } else if pe.Err != fs.ErrClosed {
203 t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
204 } else {
205 t.Logf("I/O returned expected error %q", err)
206 }
207 }
208
209 func TestClosedPipeRaceRead(t *testing.T) {
210 testClosedPipeRace(t, true)
211 }
212
213 func TestClosedPipeRaceWrite(t *testing.T) {
214 testClosedPipeRace(t, false)
215 }
216
217
218
219
220 func TestReadNonblockingFd(t *testing.T) {
221 switch runtime.GOOS {
222 case "windows":
223 t.Skip("Windows doesn't support SetNonblock")
224 }
225 if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
226 fd := syscallDescriptor(os.Stdin.Fd())
227 syscall.SetNonblock(fd, true)
228 defer syscall.SetNonblock(fd, false)
229 _, err := os.Stdin.Read(make([]byte, 1))
230 if err != nil {
231 if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
232 t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
233 }
234 }
235 os.Exit(0)
236 }
237
238 testenv.MustHaveExec(t)
239 r, w, err := os.Pipe()
240 if err != nil {
241 t.Fatal(err)
242 }
243 defer r.Close()
244 defer w.Close()
245 cmd := osexec.Command(os.Args[0], "-test.run="+t.Name())
246 cmd.Env = append(os.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
247 cmd.Stdin = r
248 output, err := cmd.CombinedOutput()
249 t.Logf("%s", output)
250 if err != nil {
251 t.Errorf("child process failed: %v", err)
252 }
253 }
254
255 func TestCloseWithBlockingReadByNewFile(t *testing.T) {
256 var p [2]syscallDescriptor
257 err := syscall.Pipe(p[:])
258 if err != nil {
259 t.Fatal(err)
260 }
261
262 testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
263 }
264
265 func TestCloseWithBlockingReadByFd(t *testing.T) {
266 r, w, err := os.Pipe()
267 if err != nil {
268 t.Fatal(err)
269 }
270
271 _ = r.Fd()
272 testCloseWithBlockingRead(t, r, w)
273 }
274
275
276 func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
277 defer r.Close()
278 defer w.Close()
279
280 c1, c2 := make(chan bool), make(chan bool)
281 var wg sync.WaitGroup
282
283 wg.Add(1)
284 go func(c chan bool) {
285 defer wg.Done()
286
287
288
289 time.Sleep(20 * time.Millisecond)
290
291 if err := r.Close(); err != nil {
292 t.Error(err)
293 }
294 close(c)
295 }(c1)
296
297 wg.Add(1)
298 go func(c chan bool) {
299 defer wg.Done()
300 var b [1]byte
301 _, err := r.Read(b[:])
302 close(c)
303 if err == nil {
304 t.Error("I/O on closed pipe unexpectedly succeeded")
305 }
306 if pe, ok := err.(*fs.PathError); ok {
307 err = pe.Err
308 }
309 if err != io.EOF && err != fs.ErrClosed {
310 t.Errorf("got %v, expected EOF or closed", err)
311 }
312 }(c2)
313
314 for c1 != nil || c2 != nil {
315 select {
316 case <-c1:
317 c1 = nil
318
319
320 w.Close()
321 case <-c2:
322 c2 = nil
323 case <-time.After(1 * time.Second):
324 switch {
325 case c1 != nil && c2 != nil:
326 t.Error("timed out waiting for Read and Close")
327 w.Close()
328 case c1 != nil:
329 t.Error("timed out waiting for Close")
330 case c2 != nil:
331 t.Error("timed out waiting for Read")
332 default:
333 t.Error("impossible case")
334 }
335 }
336 }
337
338 wg.Wait()
339 }
340
341
342 func TestPipeEOF(t *testing.T) {
343 r, w, err := os.Pipe()
344 if err != nil {
345 t.Fatal(err)
346 }
347
348 var wg sync.WaitGroup
349 wg.Add(1)
350 go func() {
351 defer wg.Done()
352
353 defer func() {
354 if err := w.Close(); err != nil {
355 t.Errorf("error closing writer: %v", err)
356 }
357 }()
358
359 for i := 0; i < 3; i++ {
360 time.Sleep(10 * time.Millisecond)
361 _, err := fmt.Fprintf(w, "line %d\n", i)
362 if err != nil {
363 t.Errorf("error writing to fifo: %v", err)
364 return
365 }
366 }
367 time.Sleep(10 * time.Millisecond)
368 }()
369
370 defer wg.Wait()
371
372 done := make(chan bool)
373 go func() {
374 defer close(done)
375
376 defer func() {
377 if err := r.Close(); err != nil {
378 t.Errorf("error closing reader: %v", err)
379 }
380 }()
381
382 rbuf := bufio.NewReader(r)
383 for {
384 b, err := rbuf.ReadBytes('\n')
385 if err == io.EOF {
386 break
387 }
388 if err != nil {
389 t.Error(err)
390 return
391 }
392 t.Logf("%s\n", bytes.TrimSpace(b))
393 }
394 }()
395
396 select {
397 case <-done:
398
399 case <-time.After(time.Second):
400 t.Error("timed out waiting for read")
401
402 r.Close()
403 }
404 }
405
406
407 func TestFdRace(t *testing.T) {
408 r, w, err := os.Pipe()
409 if err != nil {
410 t.Fatal(err)
411 }
412 defer r.Close()
413 defer w.Close()
414
415 var wg sync.WaitGroup
416 call := func() {
417 defer wg.Done()
418 w.Fd()
419 }
420
421 const tries = 100
422 for i := 0; i < tries; i++ {
423 wg.Add(1)
424 go call()
425 }
426 wg.Wait()
427 }
428
429 func TestFdReadRace(t *testing.T) {
430 t.Parallel()
431
432 r, w, err := os.Pipe()
433 if err != nil {
434 t.Fatal(err)
435 }
436 defer r.Close()
437 defer w.Close()
438
439 const count = 10
440
441 c := make(chan bool, 1)
442 var wg sync.WaitGroup
443 wg.Add(1)
444 go func() {
445 defer wg.Done()
446 var buf [count]byte
447 r.SetReadDeadline(time.Now().Add(time.Minute))
448 c <- true
449 if _, err := r.Read(buf[:]); os.IsTimeout(err) {
450 t.Error("read timed out")
451 }
452 }()
453
454 wg.Add(1)
455 go func() {
456 defer wg.Done()
457 <-c
458
459
460
461 time.Sleep(10 * time.Millisecond)
462 r.Fd()
463
464
465
466
467 w.Write(make([]byte, count))
468 r.Close()
469 }()
470
471 wg.Wait()
472 }
473
View as plain text