Source file
src/runtime/chan.go
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "internal/abi"
22 "runtime/internal/atomic"
23 "runtime/internal/math"
24 "unsafe"
25 )
26
27 const (
28 maxAlign = 8
29 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
30 debugChan = false
31 )
32
33 type hchan struct {
34 qcount uint
35 dataqsiz uint
36 buf unsafe.Pointer
37 elemsize uint16
38 closed uint32
39 elemtype *_type
40 sendx uint
41 recvx uint
42 recvq waitq
43 sendq waitq
44
45
46
47
48
49
50
51 lock mutex
52 }
53
54 type waitq struct {
55 first *sudog
56 last *sudog
57 }
58
59
60 func reflect_makechan(t *chantype, size int) *hchan {
61 return makechan(t, size)
62 }
63
64 func makechan64(t *chantype, size int64) *hchan {
65 if int64(int(size)) != size {
66 panic(plainError("makechan: size out of range"))
67 }
68
69 return makechan(t, int(size))
70 }
71
72 func makechan(t *chantype, size int) *hchan {
73 elem := t.elem
74
75
76 if elem.size >= 1<<16 {
77 throw("makechan: invalid channel element type")
78 }
79 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
80 throw("makechan: bad alignment")
81 }
82
83 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
84 if overflow || mem > maxAlloc-hchanSize || size < 0 {
85 panic(plainError("makechan: size out of range"))
86 }
87
88
89
90
91
92 var c *hchan
93 switch {
94 case mem == 0:
95
96 c = (*hchan)(mallocgc(hchanSize, nil, true))
97
98 c.buf = c.raceaddr()
99 case elem.ptrdata == 0:
100
101
102 c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
103 c.buf = add(unsafe.Pointer(c), hchanSize)
104 default:
105
106 c = new(hchan)
107 c.buf = mallocgc(mem, elem, true)
108 }
109
110 c.elemsize = uint16(elem.size)
111 c.elemtype = elem
112 c.dataqsiz = uint(size)
113 lockInit(&c.lock, lockRankHchan)
114
115 if debugChan {
116 print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
117 }
118 return c
119 }
120
121
122 func chanbuf(c *hchan, i uint) unsafe.Pointer {
123 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
124 }
125
126
127
128
129
130 func full(c *hchan) bool {
131
132
133 if c.dataqsiz == 0 {
134
135 return c.recvq.first == nil
136 }
137
138 return c.qcount == c.dataqsiz
139 }
140
141
142
143 func chansend1(c *hchan, elem unsafe.Pointer) {
144 chansend(c, elem, true, getcallerpc())
145 }
146
147
159 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
160 if c == nil {
161 if !block {
162 return false
163 }
164 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
165 throw("unreachable")
166 }
167
168 if debugChan {
169 print("chansend: chan=", c, "\n")
170 }
171
172 if raceenabled {
173 racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192 if !block && c.closed == 0 && full(c) {
193 return false
194 }
195
196 var t0 int64
197 if blockprofilerate > 0 {
198 t0 = cputicks()
199 }
200
201 lock(&c.lock)
202
203 if c.closed != 0 {
204 unlock(&c.lock)
205 panic(plainError("send on closed channel"))
206 }
207
208 if sg := c.recvq.dequeue(); sg != nil {
209
210
211 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
212 return true
213 }
214
215 if c.qcount < c.dataqsiz {
216
217 qp := chanbuf(c, c.sendx)
218 if raceenabled {
219 racenotify(c, c.sendx, nil)
220 }
221 typedmemmove(c.elemtype, qp, ep)
222 c.sendx++
223 if c.sendx == c.dataqsiz {
224 c.sendx = 0
225 }
226 c.qcount++
227 unlock(&c.lock)
228 return true
229 }
230
231 if !block {
232 unlock(&c.lock)
233 return false
234 }
235
236
237 gp := getg()
238 mysg := acquireSudog()
239 mysg.releasetime = 0
240 if t0 != 0 {
241 mysg.releasetime = -1
242 }
243
244
245 mysg.elem = ep
246 mysg.waitlink = nil
247 mysg.g = gp
248 mysg.isSelect = false
249 mysg.c = c
250 gp.waiting = mysg
251 gp.param = nil
252 c.sendq.enqueue(mysg)
253
254
255
256
257 atomic.Store8(&gp.parkingOnChan, 1)
258 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
259
260
261
262
263 KeepAlive(ep)
264
265
266 if mysg != gp.waiting {
267 throw("G waiting list is corrupted")
268 }
269 gp.waiting = nil
270 gp.activeStackChans = false
271 closed := !mysg.success
272 gp.param = nil
273 if mysg.releasetime > 0 {
274 blockevent(mysg.releasetime-t0, 2)
275 }
276 mysg.c = nil
277 releaseSudog(mysg)
278 if closed {
279 if c.closed == 0 {
280 throw("chansend: spurious wakeup")
281 }
282 panic(plainError("send on closed channel"))
283 }
284 return true
285 }
286
287
288
289
290
291
292
293 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
294 if raceenabled {
295 if c.dataqsiz == 0 {
296 racesync(c, sg)
297 } else {
298
299
300
301 racenotify(c, c.recvx, nil)
302 racenotify(c, c.recvx, sg)
303 c.recvx++
304 if c.recvx == c.dataqsiz {
305 c.recvx = 0
306 }
307 c.sendx = c.recvx
308 }
309 }
310 if sg.elem != nil {
311 sendDirect(c.elemtype, sg, ep)
312 sg.elem = nil
313 }
314 gp := sg.g
315 unlockf()
316 gp.param = unsafe.Pointer(sg)
317 sg.success = true
318 if sg.releasetime != 0 {
319 sg.releasetime = cputicks()
320 }
321 goready(gp, skip+1)
322 }
323
324
325
326
327
328
329
330
331
332
333
334 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
335
336
337
338
339
340 dst := sg.elem
341 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
342
343
344 memmove(dst, src, t.size)
345 }
346
347 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
348
349
350
351 src := sg.elem
352 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
353 memmove(dst, src, t.size)
354 }
355
356 func closechan(c *hchan) {
357 if c == nil {
358 panic(plainError("close of nil channel"))
359 }
360
361 lock(&c.lock)
362 if c.closed != 0 {
363 unlock(&c.lock)
364 panic(plainError("close of closed channel"))
365 }
366
367 if raceenabled {
368 callerpc := getcallerpc()
369 racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
370 racerelease(c.raceaddr())
371 }
372
373 c.closed = 1
374
375 var glist gList
376
377
378 for {
379 sg := c.recvq.dequeue()
380 if sg == nil {
381 break
382 }
383 if sg.elem != nil {
384 typedmemclr(c.elemtype, sg.elem)
385 sg.elem = nil
386 }
387 if sg.releasetime != 0 {
388 sg.releasetime = cputicks()
389 }
390 gp := sg.g
391 gp.param = unsafe.Pointer(sg)
392 sg.success = false
393 if raceenabled {
394 raceacquireg(gp, c.raceaddr())
395 }
396 glist.push(gp)
397 }
398
399
400 for {
401 sg := c.sendq.dequeue()
402 if sg == nil {
403 break
404 }
405 sg.elem = nil
406 if sg.releasetime != 0 {
407 sg.releasetime = cputicks()
408 }
409 gp := sg.g
410 gp.param = unsafe.Pointer(sg)
411 sg.success = false
412 if raceenabled {
413 raceacquireg(gp, c.raceaddr())
414 }
415 glist.push(gp)
416 }
417 unlock(&c.lock)
418
419
420 for !glist.empty() {
421 gp := glist.pop()
422 gp.schedlink = 0
423 goready(gp, 3)
424 }
425 }
426
427
428
429 func empty(c *hchan) bool {
430
431 if c.dataqsiz == 0 {
432 return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
433 }
434 return atomic.Loaduint(&c.qcount) == 0
435 }
436
437
438
439 func chanrecv1(c *hchan, elem unsafe.Pointer) {
440 chanrecv(c, elem, true)
441 }
442
443
444 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
445 _, received = chanrecv(c, elem, true)
446 return
447 }
448
449
450
451
452
453
454
455 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
456
457
458
459 if debugChan {
460 print("chanrecv: chan=", c, "\n")
461 }
462
463 if c == nil {
464 if !block {
465 return
466 }
467 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
468 throw("unreachable")
469 }
470
471
472 if !block && empty(c) {
473
474
475
476
477
478
479
480
481
482 if atomic.Load(&c.closed) == 0 {
483
484
485
486
487 return
488 }
489
490
491
492 if empty(c) {
493
494 if raceenabled {
495 raceacquire(c.raceaddr())
496 }
497 if ep != nil {
498 typedmemclr(c.elemtype, ep)
499 }
500 return true, false
501 }
502 }
503
504 var t0 int64
505 if blockprofilerate > 0 {
506 t0 = cputicks()
507 }
508
509 lock(&c.lock)
510
511 if c.closed != 0 && c.qcount == 0 {
512 if raceenabled {
513 raceacquire(c.raceaddr())
514 }
515 unlock(&c.lock)
516 if ep != nil {
517 typedmemclr(c.elemtype, ep)
518 }
519 return true, false
520 }
521
522 if sg := c.sendq.dequeue(); sg != nil {
523
524
525
526
527 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
528 return true, true
529 }
530
531 if c.qcount > 0 {
532
533 qp := chanbuf(c, c.recvx)
534 if raceenabled {
535 racenotify(c, c.recvx, nil)
536 }
537 if ep != nil {
538 typedmemmove(c.elemtype, ep, qp)
539 }
540 typedmemclr(c.elemtype, qp)
541 c.recvx++
542 if c.recvx == c.dataqsiz {
543 c.recvx = 0
544 }
545 c.qcount--
546 unlock(&c.lock)
547 return true, true
548 }
549
550 if !block {
551 unlock(&c.lock)
552 return false, false
553 }
554
555
556 gp := getg()
557 mysg := acquireSudog()
558 mysg.releasetime = 0
559 if t0 != 0 {
560 mysg.releasetime = -1
561 }
562
563
564 mysg.elem = ep
565 mysg.waitlink = nil
566 gp.waiting = mysg
567 mysg.g = gp
568 mysg.isSelect = false
569 mysg.c = c
570 gp.param = nil
571 c.recvq.enqueue(mysg)
572
573
574
575
576 atomic.Store8(&gp.parkingOnChan, 1)
577 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
578
579
580 if mysg != gp.waiting {
581 throw("G waiting list is corrupted")
582 }
583 gp.waiting = nil
584 gp.activeStackChans = false
585 if mysg.releasetime > 0 {
586 blockevent(mysg.releasetime-t0, 2)
587 }
588 success := mysg.success
589 gp.param = nil
590 mysg.c = nil
591 releaseSudog(mysg)
592 return true, success
593 }
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
609 if c.dataqsiz == 0 {
610 if raceenabled {
611 racesync(c, sg)
612 }
613 if ep != nil {
614
615 recvDirect(c.elemtype, sg, ep)
616 }
617 } else {
618
619
620
621
622 qp := chanbuf(c, c.recvx)
623 if raceenabled {
624 racenotify(c, c.recvx, nil)
625 racenotify(c, c.recvx, sg)
626 }
627
628 if ep != nil {
629 typedmemmove(c.elemtype, ep, qp)
630 }
631
632 typedmemmove(c.elemtype, qp, sg.elem)
633 c.recvx++
634 if c.recvx == c.dataqsiz {
635 c.recvx = 0
636 }
637 c.sendx = c.recvx
638 }
639 sg.elem = nil
640 gp := sg.g
641 unlockf()
642 gp.param = unsafe.Pointer(sg)
643 sg.success = true
644 if sg.releasetime != 0 {
645 sg.releasetime = cputicks()
646 }
647 goready(gp, skip+1)
648 }
649
650 func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
651
652
653
654
655
656 gp.activeStackChans = true
657
658
659
660 atomic.Store8(&gp.parkingOnChan, 0)
661
662
663
664
665
666 unlock((*mutex)(chanLock))
667 return true
668 }
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
688 return chansend(c, elem, false, getcallerpc())
689 }
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
709 return chanrecv(c, elem, false)
710 }
711
712
713 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
714 return chansend(c, elem, !nb, getcallerpc())
715 }
716
717
718 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
719 return chanrecv(c, elem, !nb)
720 }
721
722
723 func reflect_chanlen(c *hchan) int {
724 if c == nil {
725 return 0
726 }
727 return int(c.qcount)
728 }
729
730
731 func reflectlite_chanlen(c *hchan) int {
732 if c == nil {
733 return 0
734 }
735 return int(c.qcount)
736 }
737
738
739 func reflect_chancap(c *hchan) int {
740 if c == nil {
741 return 0
742 }
743 return int(c.dataqsiz)
744 }
745
746
747 func reflect_chanclose(c *hchan) {
748 closechan(c)
749 }
750
751 func (q *waitq) enqueue(sgp *sudog) {
752 sgp.next = nil
753 x := q.last
754 if x == nil {
755 sgp.prev = nil
756 q.first = sgp
757 q.last = sgp
758 return
759 }
760 sgp.prev = x
761 x.next = sgp
762 q.last = sgp
763 }
764
765 func (q *waitq) dequeue() *sudog {
766 for {
767 sgp := q.first
768 if sgp == nil {
769 return nil
770 }
771 y := sgp.next
772 if y == nil {
773 q.first = nil
774 q.last = nil
775 } else {
776 y.prev = nil
777 q.first = y
778 sgp.next = nil
779 }
780
781
782
783
784
785
786
787
788
789 if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
790 continue
791 }
792
793 return sgp
794 }
795 }
796
797 func (c *hchan) raceaddr() unsafe.Pointer {
798
799
800
801
802
803 return unsafe.Pointer(&c.buf)
804 }
805
806 func racesync(c *hchan, sg *sudog) {
807 racerelease(chanbuf(c, 0))
808 raceacquireg(sg.g, chanbuf(c, 0))
809 racereleaseg(sg.g, chanbuf(c, 0))
810 raceacquire(chanbuf(c, 0))
811 }
812
813
814
815
816 func racenotify(c *hchan, idx uint, sg *sudog) {
817
818
819
820
821
822
823
824 qp := chanbuf(c, idx)
825
826
827
828
829
830
831 if c.elemsize == 0 {
832 if sg == nil {
833 raceacquire(qp)
834 racerelease(qp)
835 } else {
836 raceacquireg(sg.g, qp)
837 racereleaseg(sg.g, qp)
838 }
839 } else {
840 if sg == nil {
841 racereleaseacquire(qp)
842 } else {
843 racereleaseacquireg(sg.g, qp)
844 }
845 }
846 }
847
View as plain text