Source file src/sync/poolqueue.go
1 // Copyright 2019 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package sync 6 7 import ( 8 "sync/atomic" 9 "unsafe" 10 ) 11 12 // poolDequeue is a lock-free fixed-size single-producer, 13 // multi-consumer queue. The single producer can both push and pop 14 // from the head, and consumers can pop from the tail. 15 // 16 // It has the added feature that it nils out unused slots to avoid 17 // unnecessary retention of objects. This is important for sync.Pool, 18 // but not typically a property considered in the literature. 19 type poolDequeue struct { 20 // headTail packs together a 32-bit head index and a 32-bit 21 // tail index. Both are indexes into vals modulo len(vals)-1. 22 // 23 // tail = index of oldest data in queue 24 // head = index of next slot to fill 25 // 26 // Slots in the range [tail, head) are owned by consumers. 27 // A consumer continues to own a slot outside this range until 28 // it nils the slot, at which point ownership passes to the 29 // producer. 30 // 31 // The head index is stored in the most-significant bits so 32 // that we can atomically add to it and the overflow is 33 // harmless. 34 headTail uint64 35 36 // vals is a ring buffer of interface{} values stored in this 37 // dequeue. The size of this must be a power of 2. 38 // 39 // vals[i].typ is nil if the slot is empty and non-nil 40 // otherwise. A slot is still in use until *both* the tail 41 // index has moved beyond it and typ has been set to nil. This 42 // is set to nil atomically by the consumer and read 43 // atomically by the producer. 44 vals []eface 45 } 46 47 type eface struct { 48 typ, val unsafe.Pointer 49 } 50 51 const dequeueBits = 32 52 53 // dequeueLimit is the maximum size of a poolDequeue. 54 // 55 // This must be at most (1<<dequeueBits)/2 because detecting fullness 56 // depends on wrapping around the ring buffer without wrapping around 57 // the index. We divide by 4 so this fits in an int on 32-bit. 58 const dequeueLimit = (1 << dequeueBits) / 4 59 60 // dequeueNil is used in poolDequeue to represent interface{}(nil). 61 // Since we use nil to represent empty slots, we need a sentinel value 62 // to represent nil. 63 type dequeueNil *struct{} 64 65 func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { 66 const mask = 1<<dequeueBits - 1 67 head = uint32((ptrs >> dequeueBits) & mask) 68 tail = uint32(ptrs & mask) 69 return 70 } 71 72 func (d *poolDequeue) pack(head, tail uint32) uint64 { 73 const mask = 1<<dequeueBits - 1 74 return (uint64(head) << dequeueBits) | 75 uint64(tail&mask) 76 } 77 78 // pushHead adds val at the head of the queue. It returns false if the 79 // queue is full. It must only be called by a single producer. 80 func (d *poolDequeue) pushHead(val any) bool { 81 ptrs := atomic.LoadUint64(&d.headTail) 82 head, tail := d.unpack(ptrs) 83 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { 84 // Queue is full. 85 return false 86 } 87 slot := &d.vals[head&uint32(len(d.vals)-1)] 88 89 // Check if the head slot has been released by popTail. 90 typ := atomic.LoadPointer(&slot.typ) 91 if typ != nil { 92 // Another goroutine is still cleaning up the tail, so 93 // the queue is actually still full. 94 return false 95 } 96 97 // The head slot is free, so we own it. 98 if val == nil { 99 val = dequeueNil(nil) 100 } 101 *(*any)(unsafe.Pointer(slot)) = val 102 103 // Increment head. This passes ownership of slot to popTail 104 // and acts as a store barrier for writing the slot. 105 atomic.AddUint64(&d.headTail, 1<<dequeueBits) 106 return true 107 } 108 109 // popHead removes and returns the element at the head of the queue. 110 // It returns false if the queue is empty. It must only be called by a 111 // single producer. 112 func (d *poolDequeue) popHead() (any, bool) { 113 var slot *eface 114 for { 115 ptrs := atomic.LoadUint64(&d.headTail) 116 head, tail := d.unpack(ptrs) 117 if tail == head { 118 // Queue is empty. 119 return nil, false 120 } 121 122 // Confirm tail and decrement head. We do this before 123 // reading the value to take back ownership of this 124 // slot. 125 head-- 126 ptrs2 := d.pack(head, tail) 127 if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { 128 // We successfully took back slot. 129 slot = &d.vals[head&uint32(len(d.vals)-1)] 130 break 131 } 132 } 133 134 val := *(*any)(unsafe.Pointer(slot)) 135 if val == dequeueNil(nil) { 136 val = nil 137 } 138 // Zero the slot. Unlike popTail, this isn't racing with 139 // pushHead, so we don't need to be careful here. 140 *slot = eface{} 141 return val, true 142 } 143 144 // popTail removes and returns the element at the tail of the queue. 145 // It returns false if the queue is empty. It may be called by any 146 // number of consumers. 147 func (d *poolDequeue) popTail() (any, bool) { 148 var slot *eface 149 for { 150 ptrs := atomic.LoadUint64(&d.headTail) 151 head, tail := d.unpack(ptrs) 152 if tail == head { 153 // Queue is empty. 154 return nil, false 155 } 156 157 // Confirm head and tail (for our speculative check 158 // above) and increment tail. If this succeeds, then 159 // we own the slot at tail. 160 ptrs2 := d.pack(head, tail+1) 161 if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { 162 // Success. 163 slot = &d.vals[tail&uint32(len(d.vals)-1)] 164 break 165 } 166 } 167 168 // We now own slot. 169 val := *(*any)(unsafe.Pointer(slot)) 170 if val == dequeueNil(nil) { 171 val = nil 172 } 173 174 // Tell pushHead that we're done with this slot. Zeroing the 175 // slot is also important so we don't leave behind references 176 // that could keep this object live longer than necessary. 177 // 178 // We write to val first and then publish that we're done with 179 // this slot by atomically writing to typ. 180 slot.val = nil 181 atomic.StorePointer(&slot.typ, nil) 182 // At this point pushHead owns the slot. 183 184 return val, true 185 } 186 187 // poolChain is a dynamically-sized version of poolDequeue. 188 // 189 // This is implemented as a doubly-linked list queue of poolDequeues 190 // where each dequeue is double the size of the previous one. Once a 191 // dequeue fills up, this allocates a new one and only ever pushes to 192 // the latest dequeue. Pops happen from the other end of the list and 193 // once a dequeue is exhausted, it gets removed from the list. 194 type poolChain struct { 195 // head is the poolDequeue to push to. This is only accessed 196 // by the producer, so doesn't need to be synchronized. 197 head *poolChainElt 198 199 // tail is the poolDequeue to popTail from. This is accessed 200 // by consumers, so reads and writes must be atomic. 201 tail *poolChainElt 202 } 203 204 type poolChainElt struct { 205 poolDequeue 206 207 // next and prev link to the adjacent poolChainElts in this 208 // poolChain. 209 // 210 // next is written atomically by the producer and read 211 // atomically by the consumer. It only transitions from nil to 212 // non-nil. 213 // 214 // prev is written atomically by the consumer and read 215 // atomically by the producer. It only transitions from 216 // non-nil to nil. 217 next, prev *poolChainElt 218 } 219 220 func storePoolChainElt(pp **poolChainElt, v *poolChainElt) { 221 atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) 222 } 223 224 func loadPoolChainElt(pp **poolChainElt) *poolChainElt { 225 return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) 226 } 227 228 func (c *poolChain) pushHead(val any) { 229 d := c.head 230 if d == nil { 231 // Initialize the chain. 232 const initSize = 8 // Must be a power of 2 233 d = new(poolChainElt) 234 d.vals = make([]eface, initSize) 235 c.head = d 236 storePoolChainElt(&c.tail, d) 237 } 238 239 if d.pushHead(val) { 240 return 241 } 242 243 // The current dequeue is full. Allocate a new one of twice 244 // the size. 245 newSize := len(d.vals) * 2 246 if newSize >= dequeueLimit { 247 // Can't make it any bigger. 248 newSize = dequeueLimit 249 } 250 251 d2 := &poolChainElt{prev: d} 252 d2.vals = make([]eface, newSize) 253 c.head = d2 254 storePoolChainElt(&d.next, d2) 255 d2.pushHead(val) 256 } 257 258 func (c *poolChain) popHead() (any, bool) { 259 d := c.head 260 for d != nil { 261 if val, ok := d.popHead(); ok { 262 return val, ok 263 } 264 // There may still be unconsumed elements in the 265 // previous dequeue, so try backing up. 266 d = loadPoolChainElt(&d.prev) 267 } 268 return nil, false 269 } 270 271 func (c *poolChain) popTail() (any, bool) { 272 d := loadPoolChainElt(&c.tail) 273 if d == nil { 274 return nil, false 275 } 276 277 for { 278 // It's important that we load the next pointer 279 // *before* popping the tail. In general, d may be 280 // transiently empty, but if next is non-nil before 281 // the pop and the pop fails, then d is permanently 282 // empty, which is the only condition under which it's 283 // safe to drop d from the chain. 284 d2 := loadPoolChainElt(&d.next) 285 286 if val, ok := d.popTail(); ok { 287 return val, ok 288 } 289 290 if d2 == nil { 291 // This is the only dequeue. It's empty right 292 // now, but could be pushed to in the future. 293 return nil, false 294 } 295 296 // The tail of the chain has been drained, so move on 297 // to the next dequeue. Try to drop it from the chain 298 // so the next pop doesn't have to look at the empty 299 // dequeue again. 300 if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { 301 // We won the race. Clear the prev pointer so 302 // the garbage collector can collect the empty 303 // dequeue and so popHead doesn't back up 304 // further than necessary. 305 storePoolChainElt(&d2.prev, nil) 306 } 307 d = d2 308 } 309 } 310