Source file src/internal/poll/splice_linux.go

     1  // Copyright 2018 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package poll
     6  
     7  import (
     8  	"internal/syscall/unix"
     9  	"runtime"
    10  	"sync"
    11  	"sync/atomic"
    12  	"syscall"
    13  	"unsafe"
    14  )
    15  
    16  const (
    17  	// spliceNonblock makes calls to splice(2) non-blocking.
    18  	spliceNonblock = 0x2
    19  
    20  	// maxSpliceSize is the maximum amount of data Splice asks
    21  	// the kernel to move in a single call to splice(2).
    22  	maxSpliceSize = 4 << 20
    23  )
    24  
    25  // Splice transfers at most remain bytes of data from src to dst, using the
    26  // splice system call to minimize copies of data from and to userspace.
    27  //
    28  // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
    29  // src and dst must both be stream-oriented sockets.
    30  //
    31  // If err != nil, sc is the system call which caused the error.
    32  func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
    33  	p, sc, err := getPipe()
    34  	if err != nil {
    35  		return 0, false, sc, err
    36  	}
    37  	defer putPipe(p)
    38  	var inPipe, n int
    39  	for err == nil && remain > 0 {
    40  		max := maxSpliceSize
    41  		if int64(max) > remain {
    42  			max = int(remain)
    43  		}
    44  		inPipe, err = spliceDrain(p.wfd, src, max)
    45  		// The operation is considered handled if splice returns no
    46  		// error, or an error other than EINVAL. An EINVAL means the
    47  		// kernel does not support splice for the socket type of src.
    48  		// The failed syscall does not consume any data so it is safe
    49  		// to fall back to a generic copy.
    50  		//
    51  		// spliceDrain should never return EAGAIN, so if err != nil,
    52  		// Splice cannot continue.
    53  		//
    54  		// If inPipe == 0 && err == nil, src is at EOF, and the
    55  		// transfer is complete.
    56  		handled = handled || (err != syscall.EINVAL)
    57  		if err != nil || inPipe == 0 {
    58  			break
    59  		}
    60  		p.data += inPipe
    61  
    62  		n, err = splicePump(dst, p.rfd, inPipe)
    63  		if n > 0 {
    64  			written += int64(n)
    65  			remain -= int64(n)
    66  			p.data -= n
    67  		}
    68  	}
    69  	if err != nil {
    70  		return written, handled, "splice", err
    71  	}
    72  	return written, true, "", nil
    73  }
    74  
    75  // spliceDrain moves data from a socket to a pipe.
    76  //
    77  // Invariant: when entering spliceDrain, the pipe is empty. It is either in its
    78  // initial state, or splicePump has emptied it previously.
    79  //
    80  // Given this, spliceDrain can reasonably assume that the pipe is ready for
    81  // writing, so if splice returns EAGAIN, it must be because the socket is not
    82  // ready for reading.
    83  //
    84  // If spliceDrain returns (0, nil), src is at EOF.
    85  func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
    86  	if err := sock.readLock(); err != nil {
    87  		return 0, err
    88  	}
    89  	defer sock.readUnlock()
    90  	if err := sock.pd.prepareRead(sock.isFile); err != nil {
    91  		return 0, err
    92  	}
    93  	for {
    94  		n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
    95  		if err == syscall.EINTR {
    96  			continue
    97  		}
    98  		if err != syscall.EAGAIN {
    99  			return n, err
   100  		}
   101  		if err := sock.pd.waitRead(sock.isFile); err != nil {
   102  			return n, err
   103  		}
   104  	}
   105  }
   106  
   107  // splicePump moves all the buffered data from a pipe to a socket.
   108  //
   109  // Invariant: when entering splicePump, there are exactly inPipe
   110  // bytes of data in the pipe, from a previous call to spliceDrain.
   111  //
   112  // By analogy to the condition from spliceDrain, splicePump
   113  // only needs to poll the socket for readiness, if splice returns
   114  // EAGAIN.
   115  //
   116  // If splicePump cannot move all the data in a single call to
   117  // splice(2), it loops over the buffered data until it has written
   118  // all of it to the socket. This behavior is similar to the Write
   119  // step of an io.Copy in userspace.
   120  func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
   121  	if err := sock.writeLock(); err != nil {
   122  		return 0, err
   123  	}
   124  	defer sock.writeUnlock()
   125  	if err := sock.pd.prepareWrite(sock.isFile); err != nil {
   126  		return 0, err
   127  	}
   128  	written := 0
   129  	for inPipe > 0 {
   130  		n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
   131  		// Here, the condition n == 0 && err == nil should never be
   132  		// observed, since Splice controls the write side of the pipe.
   133  		if n > 0 {
   134  			inPipe -= n
   135  			written += n
   136  			continue
   137  		}
   138  		if err != syscall.EAGAIN {
   139  			return written, err
   140  		}
   141  		if err := sock.pd.waitWrite(sock.isFile); err != nil {
   142  			return written, err
   143  		}
   144  	}
   145  	return written, nil
   146  }
   147  
   148  // splice wraps the splice system call. Since the current implementation
   149  // only uses splice on sockets and pipes, the offset arguments are unused.
   150  // splice returns int instead of int64, because callers never ask it to
   151  // move more data in a single call than can fit in an int32.
   152  func splice(out int, in int, max int, flags int) (int, error) {
   153  	n, err := syscall.Splice(in, nil, out, nil, max, flags)
   154  	return int(n), err
   155  }
   156  
   157  type splicePipeFields struct {
   158  	rfd  int
   159  	wfd  int
   160  	data int
   161  }
   162  
   163  type splicePipe struct {
   164  	splicePipeFields
   165  
   166  	// We want to use a finalizer, so ensure that the size is
   167  	// large enough to not use the tiny allocator.
   168  	_ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte
   169  }
   170  
   171  // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
   172  // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
   173  // a finalizer for each pipe to close its file descriptors before the actual GC.
   174  var splicePipePool = sync.Pool{New: newPoolPipe}
   175  
   176  func newPoolPipe() any {
   177  	// Discard the error which occurred during the creation of pipe buffer,
   178  	// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
   179  	p := newPipe()
   180  	if p == nil {
   181  		return nil
   182  	}
   183  	runtime.SetFinalizer(p, destroyPipe)
   184  	return p
   185  }
   186  
   187  // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
   188  //
   189  // Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error
   190  // and system call name splice in a string as the indication.
   191  func getPipe() (*splicePipe, string, error) {
   192  	v := splicePipePool.Get()
   193  	if v == nil {
   194  		return nil, "splice", syscall.EINVAL
   195  	}
   196  	return v.(*splicePipe), "", nil
   197  }
   198  
   199  func putPipe(p *splicePipe) {
   200  	// If there is still data left in the pipe,
   201  	// then close and discard it instead of putting it back into the pool.
   202  	if p.data != 0 {
   203  		runtime.SetFinalizer(p, nil)
   204  		destroyPipe(p)
   205  		return
   206  	}
   207  	splicePipePool.Put(p)
   208  }
   209  
   210  var disableSplice unsafe.Pointer
   211  
   212  // newPipe sets up a pipe for a splice operation.
   213  func newPipe() (sp *splicePipe) {
   214  	p := (*bool)(atomic.LoadPointer(&disableSplice))
   215  	if p != nil && *p {
   216  		return nil
   217  	}
   218  
   219  	var fds [2]int
   220  	// pipe2 was added in 2.6.27 and our minimum requirement is 2.6.23, so it
   221  	// might not be implemented. Falling back to pipe is possible, but prior to
   222  	// 2.6.29 splice returns -EAGAIN instead of 0 when the connection is
   223  	// closed.
   224  	const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
   225  	if err := syscall.Pipe2(fds[:], flags); err != nil {
   226  		return nil
   227  	}
   228  
   229  	sp = &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
   230  
   231  	if p == nil {
   232  		p = new(bool)
   233  		defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
   234  
   235  		// F_GETPIPE_SZ was added in 2.6.35, which does not have the -EAGAIN bug.
   236  		if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
   237  			*p = true
   238  			destroyPipe(sp)
   239  			return nil
   240  		}
   241  	}
   242  
   243  	return
   244  }
   245  
   246  // destroyPipe destroys a pipe.
   247  func destroyPipe(p *splicePipe) {
   248  	CloseFunc(p.rfd)
   249  	CloseFunc(p.wfd)
   250  }
   251  

View as plain text