Source file
src/runtime/select.go
1
2
3
4
5 package runtime
6
7
8
9 import (
10 "internal/abi"
11 "runtime/internal/atomic"
12 "unsafe"
13 )
14
15 const debugSelect = false
16
17
18
19
20 type scase struct {
21 c *hchan
22 elem unsafe.Pointer
23 }
24
25 var (
26 chansendpc = abi.FuncPCABIInternal(chansend)
27 chanrecvpc = abi.FuncPCABIInternal(chanrecv)
28 )
29
30 func selectsetpc(pc *uintptr) {
31 *pc = getcallerpc()
32 }
33
34 func sellock(scases []scase, lockorder []uint16) {
35 var c *hchan
36 for _, o := range lockorder {
37 c0 := scases[o].c
38 if c0 != c {
39 c = c0
40 lock(&c.lock)
41 }
42 }
43 }
44
45 func selunlock(scases []scase, lockorder []uint16) {
46
47
48
49
50
51
52
53
54 for i := len(lockorder) - 1; i >= 0; i-- {
55 c := scases[lockorder[i]].c
56 if i > 0 && c == scases[lockorder[i-1]].c {
57 continue
58 }
59 unlock(&c.lock)
60 }
61 }
62
63 func selparkcommit(gp *g, _ unsafe.Pointer) bool {
64
65
66
67
68
69 gp.activeStackChans = true
70
71
72
73 atomic.Store8(&gp.parkingOnChan, 0)
74
75
76
77
78
79
80
81
82
83
84 var lastc *hchan
85 for sg := gp.waiting; sg != nil; sg = sg.waitlink {
86 if sg.c != lastc && lastc != nil {
87
88
89
90
91
92
93 unlock(&lastc.lock)
94 }
95 lastc = sg.c
96 }
97 if lastc != nil {
98 unlock(&lastc.lock)
99 }
100 return true
101 }
102
103 func block() {
104 gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
123 if debugSelect {
124 print("select: cas0=", cas0, "\n")
125 }
126
127
128
129 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
130 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
131
132 ncases := nsends + nrecvs
133 scases := cas1[:ncases:ncases]
134 pollorder := order1[:ncases:ncases]
135 lockorder := order1[ncases:][:ncases:ncases]
136
137
138
139
140
141 var pcs []uintptr
142 if raceenabled && pc0 != nil {
143 pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
144 pcs = pc1[:ncases:ncases]
145 }
146 casePC := func(casi int) uintptr {
147 if pcs == nil {
148 return 0
149 }
150 return pcs[casi]
151 }
152
153 var t0 int64
154 if blockprofilerate > 0 {
155 t0 = cputicks()
156 }
157
158
159
160
161
162
163
164
165
166
167 norder := 0
168 for i := range scases {
169 cas := &scases[i]
170
171
172 if cas.c == nil {
173 cas.elem = nil
174 continue
175 }
176
177 j := fastrandn(uint32(norder + 1))
178 pollorder[norder] = pollorder[j]
179 pollorder[j] = uint16(i)
180 norder++
181 }
182 pollorder = pollorder[:norder]
183 lockorder = lockorder[:norder]
184
185
186
187 for i := range lockorder {
188 j := i
189
190 c := scases[pollorder[i]].c
191 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
192 k := (j - 1) / 2
193 lockorder[j] = lockorder[k]
194 j = k
195 }
196 lockorder[j] = pollorder[i]
197 }
198 for i := len(lockorder) - 1; i >= 0; i-- {
199 o := lockorder[i]
200 c := scases[o].c
201 lockorder[i] = lockorder[0]
202 j := 0
203 for {
204 k := j*2 + 1
205 if k >= i {
206 break
207 }
208 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
209 k++
210 }
211 if c.sortkey() < scases[lockorder[k]].c.sortkey() {
212 lockorder[j] = lockorder[k]
213 j = k
214 continue
215 }
216 break
217 }
218 lockorder[j] = o
219 }
220
221 if debugSelect {
222 for i := 0; i+1 < len(lockorder); i++ {
223 if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
224 print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
225 throw("select: broken sort")
226 }
227 }
228 }
229
230
231 sellock(scases, lockorder)
232
233 var (
234 gp *g
235 sg *sudog
236 c *hchan
237 k *scase
238 sglist *sudog
239 sgnext *sudog
240 qp unsafe.Pointer
241 nextp **sudog
242 )
243
244
245 var casi int
246 var cas *scase
247 var caseSuccess bool
248 var caseReleaseTime int64 = -1
249 var recvOK bool
250 for _, casei := range pollorder {
251 casi = int(casei)
252 cas = &scases[casi]
253 c = cas.c
254
255 if casi >= nsends {
256 sg = c.sendq.dequeue()
257 if sg != nil {
258 goto recv
259 }
260 if c.qcount > 0 {
261 goto bufrecv
262 }
263 if c.closed != 0 {
264 goto rclose
265 }
266 } else {
267 if raceenabled {
268 racereadpc(c.raceaddr(), casePC(casi), chansendpc)
269 }
270 if c.closed != 0 {
271 goto sclose
272 }
273 sg = c.recvq.dequeue()
274 if sg != nil {
275 goto send
276 }
277 if c.qcount < c.dataqsiz {
278 goto bufsend
279 }
280 }
281 }
282
283 if !block {
284 selunlock(scases, lockorder)
285 casi = -1
286 goto retc
287 }
288
289
290 gp = getg()
291 if gp.waiting != nil {
292 throw("gp.waiting != nil")
293 }
294 nextp = &gp.waiting
295 for _, casei := range lockorder {
296 casi = int(casei)
297 cas = &scases[casi]
298 c = cas.c
299 sg := acquireSudog()
300 sg.g = gp
301 sg.isSelect = true
302
303
304 sg.elem = cas.elem
305 sg.releasetime = 0
306 if t0 != 0 {
307 sg.releasetime = -1
308 }
309 sg.c = c
310
311 *nextp = sg
312 nextp = &sg.waitlink
313
314 if casi < nsends {
315 c.sendq.enqueue(sg)
316 } else {
317 c.recvq.enqueue(sg)
318 }
319 }
320
321
322 gp.param = nil
323
324
325
326
327 atomic.Store8(&gp.parkingOnChan, 1)
328 gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
329 gp.activeStackChans = false
330
331 sellock(scases, lockorder)
332
333 gp.selectDone = 0
334 sg = (*sudog)(gp.param)
335 gp.param = nil
336
337
338
339
340
341 casi = -1
342 cas = nil
343 caseSuccess = false
344 sglist = gp.waiting
345
346 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
347 sg1.isSelect = false
348 sg1.elem = nil
349 sg1.c = nil
350 }
351 gp.waiting = nil
352
353 for _, casei := range lockorder {
354 k = &scases[casei]
355 if sg == sglist {
356
357 casi = int(casei)
358 cas = k
359 caseSuccess = sglist.success
360 if sglist.releasetime > 0 {
361 caseReleaseTime = sglist.releasetime
362 }
363 } else {
364 c = k.c
365 if int(casei) < nsends {
366 c.sendq.dequeueSudoG(sglist)
367 } else {
368 c.recvq.dequeueSudoG(sglist)
369 }
370 }
371 sgnext = sglist.waitlink
372 sglist.waitlink = nil
373 releaseSudog(sglist)
374 sglist = sgnext
375 }
376
377 if cas == nil {
378 throw("selectgo: bad wakeup")
379 }
380
381 c = cas.c
382
383 if debugSelect {
384 print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
385 }
386
387 if casi < nsends {
388 if !caseSuccess {
389 goto sclose
390 }
391 } else {
392 recvOK = caseSuccess
393 }
394
395 if raceenabled {
396 if casi < nsends {
397 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
398 } else if cas.elem != nil {
399 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
400 }
401 }
402 if msanenabled {
403 if casi < nsends {
404 msanread(cas.elem, c.elemtype.size)
405 } else if cas.elem != nil {
406 msanwrite(cas.elem, c.elemtype.size)
407 }
408 }
409 if asanenabled {
410 if casi < nsends {
411 asanread(cas.elem, c.elemtype.size)
412 } else if cas.elem != nil {
413 asanwrite(cas.elem, c.elemtype.size)
414 }
415 }
416
417 selunlock(scases, lockorder)
418 goto retc
419
420 bufrecv:
421
422 if raceenabled {
423 if cas.elem != nil {
424 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
425 }
426 racenotify(c, c.recvx, nil)
427 }
428 if msanenabled && cas.elem != nil {
429 msanwrite(cas.elem, c.elemtype.size)
430 }
431 if asanenabled && cas.elem != nil {
432 asanwrite(cas.elem, c.elemtype.size)
433 }
434 recvOK = true
435 qp = chanbuf(c, c.recvx)
436 if cas.elem != nil {
437 typedmemmove(c.elemtype, cas.elem, qp)
438 }
439 typedmemclr(c.elemtype, qp)
440 c.recvx++
441 if c.recvx == c.dataqsiz {
442 c.recvx = 0
443 }
444 c.qcount--
445 selunlock(scases, lockorder)
446 goto retc
447
448 bufsend:
449
450 if raceenabled {
451 racenotify(c, c.sendx, nil)
452 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
453 }
454 if msanenabled {
455 msanread(cas.elem, c.elemtype.size)
456 }
457 if asanenabled {
458 asanread(cas.elem, c.elemtype.size)
459 }
460 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
461 c.sendx++
462 if c.sendx == c.dataqsiz {
463 c.sendx = 0
464 }
465 c.qcount++
466 selunlock(scases, lockorder)
467 goto retc
468
469 recv:
470
471 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
472 if debugSelect {
473 print("syncrecv: cas0=", cas0, " c=", c, "\n")
474 }
475 recvOK = true
476 goto retc
477
478 rclose:
479
480 selunlock(scases, lockorder)
481 recvOK = false
482 if cas.elem != nil {
483 typedmemclr(c.elemtype, cas.elem)
484 }
485 if raceenabled {
486 raceacquire(c.raceaddr())
487 }
488 goto retc
489
490 send:
491
492 if raceenabled {
493 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
494 }
495 if msanenabled {
496 msanread(cas.elem, c.elemtype.size)
497 }
498 if asanenabled {
499 asanread(cas.elem, c.elemtype.size)
500 }
501 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
502 if debugSelect {
503 print("syncsend: cas0=", cas0, " c=", c, "\n")
504 }
505 goto retc
506
507 retc:
508 if caseReleaseTime > 0 {
509 blockevent(caseReleaseTime-t0, 1)
510 }
511 return casi, recvOK
512
513 sclose:
514
515 selunlock(scases, lockorder)
516 panic(plainError("send on closed channel"))
517 }
518
519 func (c *hchan) sortkey() uintptr {
520 return uintptr(unsafe.Pointer(c))
521 }
522
523
524
525 type runtimeSelect struct {
526 dir selectDir
527 typ unsafe.Pointer
528 ch *hchan
529 val unsafe.Pointer
530 }
531
532
533 type selectDir int
534
535 const (
536 _ selectDir = iota
537 selectSend
538 selectRecv
539 selectDefault
540 )
541
542
543 func reflect_rselect(cases []runtimeSelect) (int, bool) {
544 if len(cases) == 0 {
545 block()
546 }
547 sel := make([]scase, len(cases))
548 orig := make([]int, len(cases))
549 nsends, nrecvs := 0, 0
550 dflt := -1
551 for i, rc := range cases {
552 var j int
553 switch rc.dir {
554 case selectDefault:
555 dflt = i
556 continue
557 case selectSend:
558 j = nsends
559 nsends++
560 case selectRecv:
561 nrecvs++
562 j = len(cases) - nrecvs
563 }
564
565 sel[j] = scase{c: rc.ch, elem: rc.val}
566 orig[j] = i
567 }
568
569
570 if nsends+nrecvs == 0 {
571 return dflt, false
572 }
573
574
575 if nsends+nrecvs < len(cases) {
576 copy(sel[nsends:], sel[len(cases)-nrecvs:])
577 copy(orig[nsends:], orig[len(cases)-nrecvs:])
578 }
579
580 order := make([]uint16, 2*(nsends+nrecvs))
581 var pc0 *uintptr
582 if raceenabled {
583 pcs := make([]uintptr, nsends+nrecvs)
584 for i := range pcs {
585 selectsetpc(&pcs[i])
586 }
587 pc0 = &pcs[0]
588 }
589
590 chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
591
592
593 if chosen < 0 {
594 chosen = dflt
595 } else {
596 chosen = orig[chosen]
597 }
598 return chosen, recvOK
599 }
600
601 func (q *waitq) dequeueSudoG(sgp *sudog) {
602 x := sgp.prev
603 y := sgp.next
604 if x != nil {
605 if y != nil {
606
607 x.next = y
608 y.prev = x
609 sgp.next = nil
610 sgp.prev = nil
611 return
612 }
613
614 x.next = nil
615 q.last = x
616 sgp.prev = nil
617 return
618 }
619 if y != nil {
620
621 y.prev = nil
622 q.first = y
623 sgp.next = nil
624 return
625 }
626
627
628
629 if q.first == sgp {
630 q.first = nil
631 q.last = nil
632 }
633 }
634
View as plain text