Source file src/sync/waitgroup.go
1 // Copyright 2011 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package sync 6 7 import ( 8 "internal/race" 9 "sync/atomic" 10 "unsafe" 11 ) 12 13 // A WaitGroup waits for a collection of goroutines to finish. 14 // The main goroutine calls Add to set the number of 15 // goroutines to wait for. Then each of the goroutines 16 // runs and calls Done when finished. At the same time, 17 // Wait can be used to block until all goroutines have finished. 18 // 19 // A WaitGroup must not be copied after first use. 20 type WaitGroup struct { 21 noCopy noCopy 22 23 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. 24 // 64-bit atomic operations require 64-bit alignment, but 32-bit 25 // compilers only guarantee that 64-bit fields are 32-bit aligned. 26 // For this reason on 32 bit architectures we need to check in state() 27 // if state1 is aligned or not, and dynamically "swap" the field order if 28 // needed. 29 state1 uint64 30 state2 uint32 31 } 32 33 // state returns pointers to the state and sema fields stored within wg.state*. 34 func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { 35 if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { 36 // state1 is 64-bit aligned: nothing to do. 37 return &wg.state1, &wg.state2 38 } else { 39 // state1 is 32-bit aligned but not 64-bit aligned: this means that 40 // (&state1)+4 is 64-bit aligned. 41 state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) 42 return (*uint64)(unsafe.Pointer(&state[1])), &state[0] 43 } 44 } 45 46 // Add adds delta, which may be negative, to the WaitGroup counter. 47 // If the counter becomes zero, all goroutines blocked on Wait are released. 48 // If the counter goes negative, Add panics. 49 // 50 // Note that calls with a positive delta that occur when the counter is zero 51 // must happen before a Wait. Calls with a negative delta, or calls with a 52 // positive delta that start when the counter is greater than zero, may happen 53 // at any time. 54 // Typically this means the calls to Add should execute before the statement 55 // creating the goroutine or other event to be waited for. 56 // If a WaitGroup is reused to wait for several independent sets of events, 57 // new Add calls must happen after all previous Wait calls have returned. 58 // See the WaitGroup example. 59 func (wg *WaitGroup) Add(delta int) { 60 statep, semap := wg.state() 61 if race.Enabled { 62 _ = *statep // trigger nil deref early 63 if delta < 0 { 64 // Synchronize decrements with Wait. 65 race.ReleaseMerge(unsafe.Pointer(wg)) 66 } 67 race.Disable() 68 defer race.Enable() 69 } 70 state := atomic.AddUint64(statep, uint64(delta)<<32) 71 v := int32(state >> 32) 72 w := uint32(state) 73 if race.Enabled && delta > 0 && v == int32(delta) { 74 // The first increment must be synchronized with Wait. 75 // Need to model this as a read, because there can be 76 // several concurrent wg.counter transitions from 0. 77 race.Read(unsafe.Pointer(semap)) 78 } 79 if v < 0 { 80 panic("sync: negative WaitGroup counter") 81 } 82 if w != 0 && delta > 0 && v == int32(delta) { 83 panic("sync: WaitGroup misuse: Add called concurrently with Wait") 84 } 85 if v > 0 || w == 0 { 86 return 87 } 88 // This goroutine has set counter to 0 when waiters > 0. 89 // Now there can't be concurrent mutations of state: 90 // - Adds must not happen concurrently with Wait, 91 // - Wait does not increment waiters if it sees counter == 0. 92 // Still do a cheap sanity check to detect WaitGroup misuse. 93 if *statep != state { 94 panic("sync: WaitGroup misuse: Add called concurrently with Wait") 95 } 96 // Reset waiters count to 0. 97 *statep = 0 98 for ; w != 0; w-- { 99 runtime_Semrelease(semap, false, 0) 100 } 101 } 102 103 // Done decrements the WaitGroup counter by one. 104 func (wg *WaitGroup) Done() { 105 wg.Add(-1) 106 } 107 108 // Wait blocks until the WaitGroup counter is zero. 109 func (wg *WaitGroup) Wait() { 110 statep, semap := wg.state() 111 if race.Enabled { 112 _ = *statep // trigger nil deref early 113 race.Disable() 114 } 115 for { 116 state := atomic.LoadUint64(statep) 117 v := int32(state >> 32) 118 w := uint32(state) 119 if v == 0 { 120 // Counter is 0, no need to wait. 121 if race.Enabled { 122 race.Enable() 123 race.Acquire(unsafe.Pointer(wg)) 124 } 125 return 126 } 127 // Increment waiters count. 128 if atomic.CompareAndSwapUint64(statep, state, state+1) { 129 if race.Enabled && w == 0 { 130 // Wait must be synchronized with the first Add. 131 // Need to model this is as a write to race with the read in Add. 132 // As a consequence, can do the write only for the first waiter, 133 // otherwise concurrent Waits will race with each other. 134 race.Write(unsafe.Pointer(semap)) 135 } 136 runtime_Semacquire(semap) 137 if *statep != 0 { 138 panic("sync: WaitGroup is reused before previous Wait has returned") 139 } 140 if race.Enabled { 141 race.Enable() 142 race.Acquire(unsafe.Pointer(wg)) 143 } 144 return 145 } 146 } 147 } 148