1
2
3
4
5 package poll
6
7 import (
8 "errors"
9 "io"
10 "sync"
11 "sync/atomic"
12 "time"
13 )
14
15 type atomicBool int32
16
17 func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
18 func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
19 func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
20
21 type FD struct {
22
23 fdmu fdMutex
24
25 Destroy func()
26
27
28 rmu sync.Mutex
29 wmu sync.Mutex
30 raio *asyncIO
31 waio *asyncIO
32 rtimer *time.Timer
33 wtimer *time.Timer
34 rtimedout atomicBool
35 wtimedout atomicBool
36
37
38
39
40
41 isFile bool
42 }
43
44
45
46
47 func (fd *FD) destroy() error {
48 if fd.Destroy != nil {
49 fd.Destroy()
50 }
51 return nil
52 }
53
54
55
56 func (fd *FD) Close() error {
57 if !fd.fdmu.increfAndClose() {
58 return errClosing(fd.isFile)
59 }
60 return nil
61 }
62
63
64 func (fd *FD) Read(fn func([]byte) (int, error), b []byte) (int, error) {
65 if err := fd.readLock(); err != nil {
66 return 0, err
67 }
68 defer fd.readUnlock()
69 if len(b) == 0 {
70 return 0, nil
71 }
72 fd.rmu.Lock()
73 if fd.rtimedout.isSet() {
74 fd.rmu.Unlock()
75 return 0, ErrDeadlineExceeded
76 }
77 fd.raio = newAsyncIO(fn, b)
78 fd.rmu.Unlock()
79 n, err := fd.raio.Wait()
80 fd.raio = nil
81 if isHangup(err) {
82 err = io.EOF
83 }
84 if isInterrupted(err) {
85 err = ErrDeadlineExceeded
86 }
87 return n, err
88 }
89
90
91 func (fd *FD) Write(fn func([]byte) (int, error), b []byte) (int, error) {
92 if err := fd.writeLock(); err != nil {
93 return 0, err
94 }
95 defer fd.writeUnlock()
96 fd.wmu.Lock()
97 if fd.wtimedout.isSet() {
98 fd.wmu.Unlock()
99 return 0, ErrDeadlineExceeded
100 }
101 fd.waio = newAsyncIO(fn, b)
102 fd.wmu.Unlock()
103 n, err := fd.waio.Wait()
104 fd.waio = nil
105 if isInterrupted(err) {
106 err = ErrDeadlineExceeded
107 }
108 return n, err
109 }
110
111
112 func (fd *FD) SetDeadline(t time.Time) error {
113 return setDeadlineImpl(fd, t, 'r'+'w')
114 }
115
116
117 func (fd *FD) SetReadDeadline(t time.Time) error {
118 return setDeadlineImpl(fd, t, 'r')
119 }
120
121
122 func (fd *FD) SetWriteDeadline(t time.Time) error {
123 return setDeadlineImpl(fd, t, 'w')
124 }
125
126 func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
127 d := t.Sub(time.Now())
128 if mode == 'r' || mode == 'r'+'w' {
129 fd.rmu.Lock()
130 defer fd.rmu.Unlock()
131 fd.rtimedout.setFalse()
132 }
133 if mode == 'w' || mode == 'r'+'w' {
134 fd.wmu.Lock()
135 defer fd.wmu.Unlock()
136 fd.wtimedout.setFalse()
137 }
138 if t.IsZero() || d < 0 {
139
140 if mode == 'r' || mode == 'r'+'w' {
141 if fd.rtimer != nil {
142 fd.rtimer.Stop()
143 }
144 fd.rtimer = nil
145 }
146 if mode == 'w' || mode == 'r'+'w' {
147 if fd.wtimer != nil {
148 fd.wtimer.Stop()
149 }
150 fd.wtimer = nil
151 }
152 } else {
153
154 if mode == 'r' || mode == 'r'+'w' {
155 fd.rtimer = time.AfterFunc(d, func() {
156 fd.rmu.Lock()
157 fd.rtimedout.setTrue()
158 if fd.raio != nil {
159 fd.raio.Cancel()
160 }
161 fd.rmu.Unlock()
162 })
163 }
164 if mode == 'w' || mode == 'r'+'w' {
165 fd.wtimer = time.AfterFunc(d, func() {
166 fd.wmu.Lock()
167 fd.wtimedout.setTrue()
168 if fd.waio != nil {
169 fd.waio.Cancel()
170 }
171 fd.wmu.Unlock()
172 })
173 }
174 }
175 if !t.IsZero() && d < 0 {
176
177 if mode == 'r' || mode == 'r'+'w' {
178 fd.rtimedout.setTrue()
179 if fd.raio != nil {
180 fd.raio.Cancel()
181 }
182 }
183 if mode == 'w' || mode == 'r'+'w' {
184 fd.wtimedout.setTrue()
185 if fd.waio != nil {
186 fd.waio.Cancel()
187 }
188 }
189 }
190 return nil
191 }
192
193
194
195
196 func (fd *FD) ReadLock() error {
197 return fd.readLock()
198 }
199
200
201 func (fd *FD) ReadUnlock() {
202 fd.readUnlock()
203 }
204
205 func isHangup(err error) bool {
206 return err != nil && stringsHasSuffix(err.Error(), "Hangup")
207 }
208
209 func isInterrupted(err error) bool {
210 return err != nil && stringsHasSuffix(err.Error(), "interrupted")
211 }
212
213
214
215 func IsPollDescriptor(fd uintptr) bool {
216 return false
217 }
218
219
220
221 func (fd *FD) RawControl(f func(uintptr)) error {
222 return errors.New("not implemented")
223 }
224
225
226 func (fd *FD) RawRead(f func(uintptr) bool) error {
227 return errors.New("not implemented")
228 }
229
230
231 func (fd *FD) RawWrite(f func(uintptr) bool) error {
232 return errors.New("not implemented")
233 }
234
View as plain text