1
2
3
4
5 package poll
6
7 import (
8 "internal/syscall/unix"
9 "runtime"
10 "sync"
11 "sync/atomic"
12 "syscall"
13 "unsafe"
14 )
15
16 const (
17
18 spliceNonblock = 0x2
19
20
21
22 maxSpliceSize = 4 << 20
23 )
24
25
26
27
28
29
30
31
32 func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
33 p, sc, err := getPipe()
34 if err != nil {
35 return 0, false, sc, err
36 }
37 defer putPipe(p)
38 var inPipe, n int
39 for err == nil && remain > 0 {
40 max := maxSpliceSize
41 if int64(max) > remain {
42 max = int(remain)
43 }
44 inPipe, err = spliceDrain(p.wfd, src, max)
45
46
47
48
49
50
51
52
53
54
55
56 handled = handled || (err != syscall.EINVAL)
57 if err != nil || inPipe == 0 {
58 break
59 }
60 p.data += inPipe
61
62 n, err = splicePump(dst, p.rfd, inPipe)
63 if n > 0 {
64 written += int64(n)
65 remain -= int64(n)
66 p.data -= n
67 }
68 }
69 if err != nil {
70 return written, handled, "splice", err
71 }
72 return written, true, "", nil
73 }
74
75
76
77
78
79
80
81
82
83
84
85 func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
86 if err := sock.readLock(); err != nil {
87 return 0, err
88 }
89 defer sock.readUnlock()
90 if err := sock.pd.prepareRead(sock.isFile); err != nil {
91 return 0, err
92 }
93 for {
94 n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
95 if err == syscall.EINTR {
96 continue
97 }
98 if err != syscall.EAGAIN {
99 return n, err
100 }
101 if err := sock.pd.waitRead(sock.isFile); err != nil {
102 return n, err
103 }
104 }
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
121 if err := sock.writeLock(); err != nil {
122 return 0, err
123 }
124 defer sock.writeUnlock()
125 if err := sock.pd.prepareWrite(sock.isFile); err != nil {
126 return 0, err
127 }
128 written := 0
129 for inPipe > 0 {
130 n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
131
132
133 if n > 0 {
134 inPipe -= n
135 written += n
136 continue
137 }
138 if err != syscall.EAGAIN {
139 return written, err
140 }
141 if err := sock.pd.waitWrite(sock.isFile); err != nil {
142 return written, err
143 }
144 }
145 return written, nil
146 }
147
148
149
150
151
152 func splice(out int, in int, max int, flags int) (int, error) {
153 n, err := syscall.Splice(in, nil, out, nil, max, flags)
154 return int(n), err
155 }
156
157 type splicePipeFields struct {
158 rfd int
159 wfd int
160 data int
161 }
162
163 type splicePipe struct {
164 splicePipeFields
165
166
167
168 _ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte
169 }
170
171
172
173
174 var splicePipePool = sync.Pool{New: newPoolPipe}
175
176 func newPoolPipe() any {
177
178
179 p := newPipe()
180 if p == nil {
181 return nil
182 }
183 runtime.SetFinalizer(p, destroyPipe)
184 return p
185 }
186
187
188
189
190
191 func getPipe() (*splicePipe, string, error) {
192 v := splicePipePool.Get()
193 if v == nil {
194 return nil, "splice", syscall.EINVAL
195 }
196 return v.(*splicePipe), "", nil
197 }
198
199 func putPipe(p *splicePipe) {
200
201
202 if p.data != 0 {
203 runtime.SetFinalizer(p, nil)
204 destroyPipe(p)
205 return
206 }
207 splicePipePool.Put(p)
208 }
209
210 var disableSplice unsafe.Pointer
211
212
213 func newPipe() (sp *splicePipe) {
214 p := (*bool)(atomic.LoadPointer(&disableSplice))
215 if p != nil && *p {
216 return nil
217 }
218
219 var fds [2]int
220
221
222
223
224 const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
225 if err := syscall.Pipe2(fds[:], flags); err != nil {
226 return nil
227 }
228
229 sp = &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
230
231 if p == nil {
232 p = new(bool)
233 defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
234
235
236 if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
237 *p = true
238 destroyPipe(sp)
239 return nil
240 }
241 }
242
243 return
244 }
245
246
247 func destroyPipe(p *splicePipe) {
248 CloseFunc(p.rfd)
249 CloseFunc(p.wfd)
250 }
251
View as plain text