Source file src/io/pipe.go

     1  // Copyright 2009 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Pipe adapter to connect code expecting an io.Reader
     6  // with code expecting an io.Writer.
     7  
     8  package io
     9  
    10  import (
    11  	"errors"
    12  	"sync"
    13  )
    14  
    15  // onceError is an object that will only store an error once.
    16  type onceError struct {
    17  	sync.Mutex // guards following
    18  	err        error
    19  }
    20  
    21  func (a *onceError) Store(err error) {
    22  	a.Lock()
    23  	defer a.Unlock()
    24  	if a.err != nil {
    25  		return
    26  	}
    27  	a.err = err
    28  }
    29  func (a *onceError) Load() error {
    30  	a.Lock()
    31  	defer a.Unlock()
    32  	return a.err
    33  }
    34  
    35  // ErrClosedPipe is the error used for read or write operations on a closed pipe.
    36  var ErrClosedPipe = errors.New("io: read/write on closed pipe")
    37  
    38  // A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
    39  type pipe struct {
    40  	wrMu sync.Mutex // Serializes Write operations
    41  	wrCh chan []byte
    42  	rdCh chan int
    43  
    44  	once sync.Once // Protects closing done
    45  	done chan struct{}
    46  	rerr onceError
    47  	werr onceError
    48  }
    49  
    50  func (p *pipe) read(b []byte) (n int, err error) {
    51  	select {
    52  	case <-p.done:
    53  		return 0, p.readCloseError()
    54  	default:
    55  	}
    56  
    57  	select {
    58  	case bw := <-p.wrCh:
    59  		nr := copy(b, bw)
    60  		p.rdCh <- nr
    61  		return nr, nil
    62  	case <-p.done:
    63  		return 0, p.readCloseError()
    64  	}
    65  }
    66  
    67  func (p *pipe) closeRead(err error) error {
    68  	if err == nil {
    69  		err = ErrClosedPipe
    70  	}
    71  	p.rerr.Store(err)
    72  	p.once.Do(func() { close(p.done) })
    73  	return nil
    74  }
    75  
    76  func (p *pipe) write(b []byte) (n int, err error) {
    77  	select {
    78  	case <-p.done:
    79  		return 0, p.writeCloseError()
    80  	default:
    81  		p.wrMu.Lock()
    82  		defer p.wrMu.Unlock()
    83  	}
    84  
    85  	for once := true; once || len(b) > 0; once = false {
    86  		select {
    87  		case p.wrCh <- b:
    88  			nw := <-p.rdCh
    89  			b = b[nw:]
    90  			n += nw
    91  		case <-p.done:
    92  			return n, p.writeCloseError()
    93  		}
    94  	}
    95  	return n, nil
    96  }
    97  
    98  func (p *pipe) closeWrite(err error) error {
    99  	if err == nil {
   100  		err = EOF
   101  	}
   102  	p.werr.Store(err)
   103  	p.once.Do(func() { close(p.done) })
   104  	return nil
   105  }
   106  
   107  // readCloseError is considered internal to the pipe type.
   108  func (p *pipe) readCloseError() error {
   109  	rerr := p.rerr.Load()
   110  	if werr := p.werr.Load(); rerr == nil && werr != nil {
   111  		return werr
   112  	}
   113  	return ErrClosedPipe
   114  }
   115  
   116  // writeCloseError is considered internal to the pipe type.
   117  func (p *pipe) writeCloseError() error {
   118  	werr := p.werr.Load()
   119  	if rerr := p.rerr.Load(); werr == nil && rerr != nil {
   120  		return rerr
   121  	}
   122  	return ErrClosedPipe
   123  }
   124  
   125  // A PipeReader is the read half of a pipe.
   126  type PipeReader struct {
   127  	p *pipe
   128  }
   129  
   130  // Read implements the standard Read interface:
   131  // it reads data from the pipe, blocking until a writer
   132  // arrives or the write end is closed.
   133  // If the write end is closed with an error, that error is
   134  // returned as err; otherwise err is EOF.
   135  func (r *PipeReader) Read(data []byte) (n int, err error) {
   136  	return r.p.read(data)
   137  }
   138  
   139  // Close closes the reader; subsequent writes to the
   140  // write half of the pipe will return the error ErrClosedPipe.
   141  func (r *PipeReader) Close() error {
   142  	return r.CloseWithError(nil)
   143  }
   144  
   145  // CloseWithError closes the reader; subsequent writes
   146  // to the write half of the pipe will return the error err.
   147  //
   148  // CloseWithError never overwrites the previous error if it exists
   149  // and always returns nil.
   150  func (r *PipeReader) CloseWithError(err error) error {
   151  	return r.p.closeRead(err)
   152  }
   153  
   154  // A PipeWriter is the write half of a pipe.
   155  type PipeWriter struct {
   156  	p *pipe
   157  }
   158  
   159  // Write implements the standard Write interface:
   160  // it writes data to the pipe, blocking until one or more readers
   161  // have consumed all the data or the read end is closed.
   162  // If the read end is closed with an error, that err is
   163  // returned as err; otherwise err is ErrClosedPipe.
   164  func (w *PipeWriter) Write(data []byte) (n int, err error) {
   165  	return w.p.write(data)
   166  }
   167  
   168  // Close closes the writer; subsequent reads from the
   169  // read half of the pipe will return no bytes and EOF.
   170  func (w *PipeWriter) Close() error {
   171  	return w.CloseWithError(nil)
   172  }
   173  
   174  // CloseWithError closes the writer; subsequent reads from the
   175  // read half of the pipe will return no bytes and the error err,
   176  // or EOF if err is nil.
   177  //
   178  // CloseWithError never overwrites the previous error if it exists
   179  // and always returns nil.
   180  func (w *PipeWriter) CloseWithError(err error) error {
   181  	return w.p.closeWrite(err)
   182  }
   183  
   184  // Pipe creates a synchronous in-memory pipe.
   185  // It can be used to connect code expecting an io.Reader
   186  // with code expecting an io.Writer.
   187  //
   188  // Reads and Writes on the pipe are matched one to one
   189  // except when multiple Reads are needed to consume a single Write.
   190  // That is, each Write to the PipeWriter blocks until it has satisfied
   191  // one or more Reads from the PipeReader that fully consume
   192  // the written data.
   193  // The data is copied directly from the Write to the corresponding
   194  // Read (or Reads); there is no internal buffering.
   195  //
   196  // It is safe to call Read and Write in parallel with each other or with Close.
   197  // Parallel calls to Read and parallel calls to Write are also safe:
   198  // the individual calls will be gated sequentially.
   199  func Pipe() (*PipeReader, *PipeWriter) {
   200  	p := &pipe{
   201  		wrCh: make(chan []byte),
   202  		rdCh: make(chan int),
   203  		done: make(chan struct{}),
   204  	}
   205  	return &PipeReader{p}, &PipeWriter{p}
   206  }
   207  

View as plain text