Source file src/os/pipe_test.go

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Test broken pipes on Unix systems.
     6  //go:build !plan9 && !js
     7  
     8  package os_test
     9  
    10  import (
    11  	"bufio"
    12  	"bytes"
    13  	"fmt"
    14  	"internal/testenv"
    15  	"io"
    16  	"io/fs"
    17  	"os"
    18  	osexec "os/exec"
    19  	"os/signal"
    20  	"runtime"
    21  	"strconv"
    22  	"strings"
    23  	"sync"
    24  	"syscall"
    25  	"testing"
    26  	"time"
    27  )
    28  
    29  func TestEPIPE(t *testing.T) {
    30  	r, w, err := os.Pipe()
    31  	if err != nil {
    32  		t.Fatal(err)
    33  	}
    34  	if err := r.Close(); err != nil {
    35  		t.Fatal(err)
    36  	}
    37  
    38  	expect := syscall.EPIPE
    39  	if runtime.GOOS == "windows" {
    40  		// 232 is Windows error code ERROR_NO_DATA, "The pipe is being closed".
    41  		expect = syscall.Errno(232)
    42  	}
    43  	// Every time we write to the pipe we should get an EPIPE.
    44  	for i := 0; i < 20; i++ {
    45  		_, err = w.Write([]byte("hi"))
    46  		if err == nil {
    47  			t.Fatal("unexpected success of Write to broken pipe")
    48  		}
    49  		if pe, ok := err.(*fs.PathError); ok {
    50  			err = pe.Err
    51  		}
    52  		if se, ok := err.(*os.SyscallError); ok {
    53  			err = se.Err
    54  		}
    55  		if err != expect {
    56  			t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
    57  		}
    58  	}
    59  }
    60  
    61  func TestStdPipe(t *testing.T) {
    62  	switch runtime.GOOS {
    63  	case "windows":
    64  		t.Skip("Windows doesn't support SIGPIPE")
    65  	}
    66  	testenv.MustHaveExec(t)
    67  	r, w, err := os.Pipe()
    68  	if err != nil {
    69  		t.Fatal(err)
    70  	}
    71  	if err := r.Close(); err != nil {
    72  		t.Fatal(err)
    73  	}
    74  	// Invoke the test program to run the test and write to a closed pipe.
    75  	// If sig is false:
    76  	// writing to stdout or stderr should cause an immediate SIGPIPE;
    77  	// writing to descriptor 3 should fail with EPIPE and then exit 0.
    78  	// If sig is true:
    79  	// all writes should fail with EPIPE and then exit 0.
    80  	for _, sig := range []bool{false, true} {
    81  		for dest := 1; dest < 4; dest++ {
    82  			cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
    83  			cmd.Stdout = w
    84  			cmd.Stderr = w
    85  			cmd.ExtraFiles = []*os.File{w}
    86  			cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
    87  			if sig {
    88  				cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
    89  			}
    90  			if err := cmd.Run(); err == nil {
    91  				if !sig && dest < 3 {
    92  					t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
    93  				}
    94  			} else if ee, ok := err.(*osexec.ExitError); !ok {
    95  				t.Errorf("unexpected exec error type %T: %v", err, err)
    96  			} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
    97  				t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
    98  			} else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
    99  				if sig || dest > 2 {
   100  					t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
   101  				}
   102  			} else {
   103  				t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
   104  			}
   105  		}
   106  	}
   107  
   108  	// Test redirecting stdout but not stderr.  Issue 40076.
   109  	cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
   110  	cmd.Stdout = w
   111  	var stderr bytes.Buffer
   112  	cmd.Stderr = &stderr
   113  	cmd.Env = append(os.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
   114  	if err := cmd.Run(); err == nil {
   115  		t.Errorf("unexpected success of write to closed stdout")
   116  	} else if ee, ok := err.(*osexec.ExitError); !ok {
   117  		t.Errorf("unexpected exec error type %T: %v", err, err)
   118  	} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
   119  		t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
   120  	} else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
   121  		t.Errorf("unexpected exit status %v for write to closed stdout", err)
   122  	}
   123  	if output := stderr.Bytes(); len(output) > 0 {
   124  		t.Errorf("unexpected output on stderr: %s", output)
   125  	}
   126  }
   127  
   128  // This is a helper for TestStdPipe. It's not a test in itself.
   129  func TestStdPipeHelper(t *testing.T) {
   130  	if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
   131  		signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
   132  	}
   133  	switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
   134  	case "1":
   135  		os.Stdout.Write([]byte("stdout"))
   136  	case "2":
   137  		os.Stderr.Write([]byte("stderr"))
   138  	case "3":
   139  		if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
   140  			os.Exit(3)
   141  		}
   142  	default:
   143  		t.Skip("skipping test helper")
   144  	}
   145  	// For stdout/stderr, we should have crashed with a broken pipe error.
   146  	// The caller will be looking for that exit status,
   147  	// so just exit normally here to cause a failure in the caller.
   148  	// For descriptor 3, a normal exit is expected.
   149  	os.Exit(0)
   150  }
   151  
   152  func testClosedPipeRace(t *testing.T, read bool) {
   153  	limit := 1
   154  	if !read {
   155  		// Get the amount we have to write to overload a pipe
   156  		// with no reader.
   157  		limit = 131073
   158  		if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
   159  			if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
   160  				limit = i + 1
   161  			}
   162  		}
   163  		t.Logf("using pipe write limit of %d", limit)
   164  	}
   165  
   166  	r, w, err := os.Pipe()
   167  	if err != nil {
   168  		t.Fatal(err)
   169  	}
   170  	defer r.Close()
   171  	defer w.Close()
   172  
   173  	// Close the read end of the pipe in a goroutine while we are
   174  	// writing to the write end, or vice-versa.
   175  	go func() {
   176  		// Give the main goroutine a chance to enter the Read or
   177  		// Write call. This is sloppy but the test will pass even
   178  		// if we close before the read/write.
   179  		time.Sleep(20 * time.Millisecond)
   180  
   181  		var err error
   182  		if read {
   183  			err = r.Close()
   184  		} else {
   185  			err = w.Close()
   186  		}
   187  		if err != nil {
   188  			t.Error(err)
   189  		}
   190  	}()
   191  
   192  	b := make([]byte, limit)
   193  	if read {
   194  		_, err = r.Read(b[:])
   195  	} else {
   196  		_, err = w.Write(b[:])
   197  	}
   198  	if err == nil {
   199  		t.Error("I/O on closed pipe unexpectedly succeeded")
   200  	} else if pe, ok := err.(*fs.PathError); !ok {
   201  		t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
   202  	} else if pe.Err != fs.ErrClosed {
   203  		t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
   204  	} else {
   205  		t.Logf("I/O returned expected error %q", err)
   206  	}
   207  }
   208  
   209  func TestClosedPipeRaceRead(t *testing.T) {
   210  	testClosedPipeRace(t, true)
   211  }
   212  
   213  func TestClosedPipeRaceWrite(t *testing.T) {
   214  	testClosedPipeRace(t, false)
   215  }
   216  
   217  // Issue 20915: Reading on nonblocking fd should not return "waiting
   218  // for unsupported file type." Currently it returns EAGAIN; it is
   219  // possible that in the future it will simply wait for data.
   220  func TestReadNonblockingFd(t *testing.T) {
   221  	switch runtime.GOOS {
   222  	case "windows":
   223  		t.Skip("Windows doesn't support SetNonblock")
   224  	}
   225  	if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
   226  		fd := syscallDescriptor(os.Stdin.Fd())
   227  		syscall.SetNonblock(fd, true)
   228  		defer syscall.SetNonblock(fd, false)
   229  		_, err := os.Stdin.Read(make([]byte, 1))
   230  		if err != nil {
   231  			if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
   232  				t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
   233  			}
   234  		}
   235  		os.Exit(0)
   236  	}
   237  
   238  	testenv.MustHaveExec(t)
   239  	r, w, err := os.Pipe()
   240  	if err != nil {
   241  		t.Fatal(err)
   242  	}
   243  	defer r.Close()
   244  	defer w.Close()
   245  	cmd := osexec.Command(os.Args[0], "-test.run="+t.Name())
   246  	cmd.Env = append(os.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
   247  	cmd.Stdin = r
   248  	output, err := cmd.CombinedOutput()
   249  	t.Logf("%s", output)
   250  	if err != nil {
   251  		t.Errorf("child process failed: %v", err)
   252  	}
   253  }
   254  
   255  func TestCloseWithBlockingReadByNewFile(t *testing.T) {
   256  	var p [2]syscallDescriptor
   257  	err := syscall.Pipe(p[:])
   258  	if err != nil {
   259  		t.Fatal(err)
   260  	}
   261  	// os.NewFile returns a blocking mode file.
   262  	testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
   263  }
   264  
   265  func TestCloseWithBlockingReadByFd(t *testing.T) {
   266  	r, w, err := os.Pipe()
   267  	if err != nil {
   268  		t.Fatal(err)
   269  	}
   270  	// Calling Fd will put the file into blocking mode.
   271  	_ = r.Fd()
   272  	testCloseWithBlockingRead(t, r, w)
   273  }
   274  
   275  // Test that we don't let a blocking read prevent a close.
   276  func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
   277  	defer r.Close()
   278  	defer w.Close()
   279  
   280  	c1, c2 := make(chan bool), make(chan bool)
   281  	var wg sync.WaitGroup
   282  
   283  	wg.Add(1)
   284  	go func(c chan bool) {
   285  		defer wg.Done()
   286  		// Give the other goroutine a chance to enter the Read
   287  		// or Write call. This is sloppy but the test will
   288  		// pass even if we close before the read/write.
   289  		time.Sleep(20 * time.Millisecond)
   290  
   291  		if err := r.Close(); err != nil {
   292  			t.Error(err)
   293  		}
   294  		close(c)
   295  	}(c1)
   296  
   297  	wg.Add(1)
   298  	go func(c chan bool) {
   299  		defer wg.Done()
   300  		var b [1]byte
   301  		_, err := r.Read(b[:])
   302  		close(c)
   303  		if err == nil {
   304  			t.Error("I/O on closed pipe unexpectedly succeeded")
   305  		}
   306  		if pe, ok := err.(*fs.PathError); ok {
   307  			err = pe.Err
   308  		}
   309  		if err != io.EOF && err != fs.ErrClosed {
   310  			t.Errorf("got %v, expected EOF or closed", err)
   311  		}
   312  	}(c2)
   313  
   314  	for c1 != nil || c2 != nil {
   315  		select {
   316  		case <-c1:
   317  			c1 = nil
   318  			// r.Close has completed, but the blocking Read
   319  			// is hanging. Close the writer to unblock it.
   320  			w.Close()
   321  		case <-c2:
   322  			c2 = nil
   323  		case <-time.After(1 * time.Second):
   324  			switch {
   325  			case c1 != nil && c2 != nil:
   326  				t.Error("timed out waiting for Read and Close")
   327  				w.Close()
   328  			case c1 != nil:
   329  				t.Error("timed out waiting for Close")
   330  			case c2 != nil:
   331  				t.Error("timed out waiting for Read")
   332  			default:
   333  				t.Error("impossible case")
   334  			}
   335  		}
   336  	}
   337  
   338  	wg.Wait()
   339  }
   340  
   341  // Issue 24164, for pipes.
   342  func TestPipeEOF(t *testing.T) {
   343  	r, w, err := os.Pipe()
   344  	if err != nil {
   345  		t.Fatal(err)
   346  	}
   347  
   348  	var wg sync.WaitGroup
   349  	wg.Add(1)
   350  	go func() {
   351  		defer wg.Done()
   352  
   353  		defer func() {
   354  			if err := w.Close(); err != nil {
   355  				t.Errorf("error closing writer: %v", err)
   356  			}
   357  		}()
   358  
   359  		for i := 0; i < 3; i++ {
   360  			time.Sleep(10 * time.Millisecond)
   361  			_, err := fmt.Fprintf(w, "line %d\n", i)
   362  			if err != nil {
   363  				t.Errorf("error writing to fifo: %v", err)
   364  				return
   365  			}
   366  		}
   367  		time.Sleep(10 * time.Millisecond)
   368  	}()
   369  
   370  	defer wg.Wait()
   371  
   372  	done := make(chan bool)
   373  	go func() {
   374  		defer close(done)
   375  
   376  		defer func() {
   377  			if err := r.Close(); err != nil {
   378  				t.Errorf("error closing reader: %v", err)
   379  			}
   380  		}()
   381  
   382  		rbuf := bufio.NewReader(r)
   383  		for {
   384  			b, err := rbuf.ReadBytes('\n')
   385  			if err == io.EOF {
   386  				break
   387  			}
   388  			if err != nil {
   389  				t.Error(err)
   390  				return
   391  			}
   392  			t.Logf("%s\n", bytes.TrimSpace(b))
   393  		}
   394  	}()
   395  
   396  	select {
   397  	case <-done:
   398  		// Test succeeded.
   399  	case <-time.After(time.Second):
   400  		t.Error("timed out waiting for read")
   401  		// Close the reader to force the read to complete.
   402  		r.Close()
   403  	}
   404  }
   405  
   406  // Issue 24481.
   407  func TestFdRace(t *testing.T) {
   408  	r, w, err := os.Pipe()
   409  	if err != nil {
   410  		t.Fatal(err)
   411  	}
   412  	defer r.Close()
   413  	defer w.Close()
   414  
   415  	var wg sync.WaitGroup
   416  	call := func() {
   417  		defer wg.Done()
   418  		w.Fd()
   419  	}
   420  
   421  	const tries = 100
   422  	for i := 0; i < tries; i++ {
   423  		wg.Add(1)
   424  		go call()
   425  	}
   426  	wg.Wait()
   427  }
   428  
   429  func TestFdReadRace(t *testing.T) {
   430  	t.Parallel()
   431  
   432  	r, w, err := os.Pipe()
   433  	if err != nil {
   434  		t.Fatal(err)
   435  	}
   436  	defer r.Close()
   437  	defer w.Close()
   438  
   439  	const count = 10
   440  
   441  	c := make(chan bool, 1)
   442  	var wg sync.WaitGroup
   443  	wg.Add(1)
   444  	go func() {
   445  		defer wg.Done()
   446  		var buf [count]byte
   447  		r.SetReadDeadline(time.Now().Add(time.Minute))
   448  		c <- true
   449  		if _, err := r.Read(buf[:]); os.IsTimeout(err) {
   450  			t.Error("read timed out")
   451  		}
   452  	}()
   453  
   454  	wg.Add(1)
   455  	go func() {
   456  		defer wg.Done()
   457  		<-c
   458  		// Give the other goroutine a chance to enter the Read.
   459  		// It doesn't matter if this occasionally fails, the test
   460  		// will still pass, it just won't test anything.
   461  		time.Sleep(10 * time.Millisecond)
   462  		r.Fd()
   463  
   464  		// The bug was that Fd would hang until Read timed out.
   465  		// If the bug is fixed, then writing to w and closing r here
   466  		// will cause the Read to exit before the timeout expires.
   467  		w.Write(make([]byte, count))
   468  		r.Close()
   469  	}()
   470  
   471  	wg.Wait()
   472  }
   473  

View as plain text