Source file src/runtime/profbuf.go
1 // Copyright 2017 The Go Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style 3 // license that can be found in the LICENSE file. 4 5 package runtime 6 7 import ( 8 "runtime/internal/atomic" 9 "unsafe" 10 ) 11 12 // A profBuf is a lock-free buffer for profiling events, 13 // safe for concurrent use by one reader and one writer. 14 // The writer may be a signal handler running without a user g. 15 // The reader is assumed to be a user g. 16 // 17 // Each logged event corresponds to a fixed size header, a list of 18 // uintptrs (typically a stack), and exactly one unsafe.Pointer tag. 19 // The header and uintptrs are stored in the circular buffer data and the 20 // tag is stored in a circular buffer tags, running in parallel. 21 // In the circular buffer data, each event takes 2+hdrsize+len(stk) 22 // words: the value 2+hdrsize+len(stk), then the time of the event, then 23 // hdrsize words giving the fixed-size header, and then len(stk) words 24 // for the stack. 25 // 26 // The current effective offsets into the tags and data circular buffers 27 // for reading and writing are stored in the high 30 and low 32 bits of r and w. 28 // The bottom bits of the high 32 are additional flag bits in w, unused in r. 29 // "Effective" offsets means the total number of reads or writes, mod 2^length. 30 // The offset in the buffer is the effective offset mod the length of the buffer. 31 // To make wraparound mod 2^length match wraparound mod length of the buffer, 32 // the length of the buffer must be a power of two. 33 // 34 // If the reader catches up to the writer, a flag passed to read controls 35 // whether the read blocks until more data is available. A read returns a 36 // pointer to the buffer data itself; the caller is assumed to be done with 37 // that data at the next read. The read offset rNext tracks the next offset to 38 // be returned by read. By definition, r ≤ rNext ≤ w (before wraparound), 39 // and rNext is only used by the reader, so it can be accessed without atomics. 40 // 41 // If the writer gets ahead of the reader, so that the buffer fills, 42 // future writes are discarded and replaced in the output stream by an 43 // overflow entry, which has size 2+hdrsize+1, time set to the time of 44 // the first discarded write, a header of all zeroed words, and a "stack" 45 // containing one word, the number of discarded writes. 46 // 47 // Between the time the buffer fills and the buffer becomes empty enough 48 // to hold more data, the overflow entry is stored as a pending overflow 49 // entry in the fields overflow and overflowTime. The pending overflow 50 // entry can be turned into a real record by either the writer or the 51 // reader. If the writer is called to write a new record and finds that 52 // the output buffer has room for both the pending overflow entry and the 53 // new record, the writer emits the pending overflow entry and the new 54 // record into the buffer. If the reader is called to read data and finds 55 // that the output buffer is empty but that there is a pending overflow 56 // entry, the reader will return a synthesized record for the pending 57 // overflow entry. 58 // 59 // Only the writer can create or add to a pending overflow entry, but 60 // either the reader or the writer can clear the pending overflow entry. 61 // A pending overflow entry is indicated by the low 32 bits of 'overflow' 62 // holding the number of discarded writes, and overflowTime holding the 63 // time of the first discarded write. The high 32 bits of 'overflow' 64 // increment each time the low 32 bits transition from zero to non-zero 65 // or vice versa. This sequence number avoids ABA problems in the use of 66 // compare-and-swap to coordinate between reader and writer. 67 // The overflowTime is only written when the low 32 bits of overflow are 68 // zero, that is, only when there is no pending overflow entry, in 69 // preparation for creating a new one. The reader can therefore fetch and 70 // clear the entry atomically using 71 // 72 // for { 73 // overflow = load(&b.overflow) 74 // if uint32(overflow) == 0 { 75 // // no pending entry 76 // break 77 // } 78 // time = load(&b.overflowTime) 79 // if cas(&b.overflow, overflow, ((overflow>>32)+1)<<32) { 80 // // pending entry cleared 81 // break 82 // } 83 // } 84 // if uint32(overflow) > 0 { 85 // emit entry for uint32(overflow), time 86 // } 87 // 88 type profBuf struct { 89 // accessed atomically 90 r, w profAtomic 91 overflow uint64 92 overflowTime uint64 93 eof uint32 94 95 // immutable (excluding slice content) 96 hdrsize uintptr 97 data []uint64 98 tags []unsafe.Pointer 99 100 // owned by reader 101 rNext profIndex 102 overflowBuf []uint64 // for use by reader to return overflow record 103 wait note 104 } 105 106 // A profAtomic is the atomically-accessed word holding a profIndex. 107 type profAtomic uint64 108 109 // A profIndex is the packet tag and data counts and flags bits, described above. 110 type profIndex uint64 111 112 const ( 113 profReaderSleeping profIndex = 1 << 32 // reader is sleeping and must be woken up 114 profWriteExtra profIndex = 1 << 33 // overflow or eof waiting 115 ) 116 117 func (x *profAtomic) load() profIndex { 118 return profIndex(atomic.Load64((*uint64)(x))) 119 } 120 121 func (x *profAtomic) store(new profIndex) { 122 atomic.Store64((*uint64)(x), uint64(new)) 123 } 124 125 func (x *profAtomic) cas(old, new profIndex) bool { 126 return atomic.Cas64((*uint64)(x), uint64(old), uint64(new)) 127 } 128 129 func (x profIndex) dataCount() uint32 { 130 return uint32(x) 131 } 132 133 func (x profIndex) tagCount() uint32 { 134 return uint32(x >> 34) 135 } 136 137 // countSub subtracts two counts obtained from profIndex.dataCount or profIndex.tagCount, 138 // assuming that they are no more than 2^29 apart (guaranteed since they are never more than 139 // len(data) or len(tags) apart, respectively). 140 // tagCount wraps at 2^30, while dataCount wraps at 2^32. 141 // This function works for both. 142 func countSub(x, y uint32) int { 143 // x-y is 32-bit signed or 30-bit signed; sign-extend to 32 bits and convert to int. 144 return int(int32(x-y) << 2 >> 2) 145 } 146 147 // addCountsAndClearFlags returns the packed form of "x + (data, tag) - all flags". 148 func (x profIndex) addCountsAndClearFlags(data, tag int) profIndex { 149 return profIndex((uint64(x)>>34+uint64(uint32(tag)<<2>>2))<<34 | uint64(uint32(x)+uint32(data))) 150 } 151 152 // hasOverflow reports whether b has any overflow records pending. 153 func (b *profBuf) hasOverflow() bool { 154 return uint32(atomic.Load64(&b.overflow)) > 0 155 } 156 157 // takeOverflow consumes the pending overflow records, returning the overflow count 158 // and the time of the first overflow. 159 // When called by the reader, it is racing against incrementOverflow. 160 func (b *profBuf) takeOverflow() (count uint32, time uint64) { 161 overflow := atomic.Load64(&b.overflow) 162 time = atomic.Load64(&b.overflowTime) 163 for { 164 count = uint32(overflow) 165 if count == 0 { 166 time = 0 167 break 168 } 169 // Increment generation, clear overflow count in low bits. 170 if atomic.Cas64(&b.overflow, overflow, ((overflow>>32)+1)<<32) { 171 break 172 } 173 overflow = atomic.Load64(&b.overflow) 174 time = atomic.Load64(&b.overflowTime) 175 } 176 return uint32(overflow), time 177 } 178 179 // incrementOverflow records a single overflow at time now. 180 // It is racing against a possible takeOverflow in the reader. 181 func (b *profBuf) incrementOverflow(now int64) { 182 for { 183 overflow := atomic.Load64(&b.overflow) 184 185 // Once we see b.overflow reach 0, it's stable: no one else is changing it underfoot. 186 // We need to set overflowTime if we're incrementing b.overflow from 0. 187 if uint32(overflow) == 0 { 188 // Store overflowTime first so it's always available when overflow != 0. 189 atomic.Store64(&b.overflowTime, uint64(now)) 190 atomic.Store64(&b.overflow, (((overflow>>32)+1)<<32)+1) 191 break 192 } 193 // Otherwise we're racing to increment against reader 194 // who wants to set b.overflow to 0. 195 // Out of paranoia, leave 2³²-1 a sticky overflow value, 196 // to avoid wrapping around. Extremely unlikely. 197 if int32(overflow) == -1 { 198 break 199 } 200 if atomic.Cas64(&b.overflow, overflow, overflow+1) { 201 break 202 } 203 } 204 } 205 206 // newProfBuf returns a new profiling buffer with room for 207 // a header of hdrsize words and a buffer of at least bufwords words. 208 func newProfBuf(hdrsize, bufwords, tags int) *profBuf { 209 if min := 2 + hdrsize + 1; bufwords < min { 210 bufwords = min 211 } 212 213 // Buffer sizes must be power of two, so that we don't have to 214 // worry about uint32 wraparound changing the effective position 215 // within the buffers. We store 30 bits of count; limiting to 28 216 // gives us some room for intermediate calculations. 217 if bufwords >= 1<<28 || tags >= 1<<28 { 218 throw("newProfBuf: buffer too large") 219 } 220 var i int 221 for i = 1; i < bufwords; i <<= 1 { 222 } 223 bufwords = i 224 for i = 1; i < tags; i <<= 1 { 225 } 226 tags = i 227 228 b := new(profBuf) 229 b.hdrsize = uintptr(hdrsize) 230 b.data = make([]uint64, bufwords) 231 b.tags = make([]unsafe.Pointer, tags) 232 b.overflowBuf = make([]uint64, 2+b.hdrsize+1) 233 return b 234 } 235 236 // canWriteRecord reports whether the buffer has room 237 // for a single contiguous record with a stack of length nstk. 238 func (b *profBuf) canWriteRecord(nstk int) bool { 239 br := b.r.load() 240 bw := b.w.load() 241 242 // room for tag? 243 if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 1 { 244 return false 245 } 246 247 // room for data? 248 nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data) 249 want := 2 + int(b.hdrsize) + nstk 250 i := int(bw.dataCount() % uint32(len(b.data))) 251 if i+want > len(b.data) { 252 // Can't fit in trailing fragment of slice. 253 // Skip over that and start over at beginning of slice. 254 nd -= len(b.data) - i 255 } 256 return nd >= want 257 } 258 259 // canWriteTwoRecords reports whether the buffer has room 260 // for two records with stack lengths nstk1, nstk2, in that order. 261 // Each record must be contiguous on its own, but the two 262 // records need not be contiguous (one can be at the end of the buffer 263 // and the other can wrap around and start at the beginning of the buffer). 264 func (b *profBuf) canWriteTwoRecords(nstk1, nstk2 int) bool { 265 br := b.r.load() 266 bw := b.w.load() 267 268 // room for tag? 269 if countSub(br.tagCount(), bw.tagCount())+len(b.tags) < 2 { 270 return false 271 } 272 273 // room for data? 274 nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data) 275 276 // first record 277 want := 2 + int(b.hdrsize) + nstk1 278 i := int(bw.dataCount() % uint32(len(b.data))) 279 if i+want > len(b.data) { 280 // Can't fit in trailing fragment of slice. 281 // Skip over that and start over at beginning of slice. 282 nd -= len(b.data) - i 283 i = 0 284 } 285 i += want 286 nd -= want 287 288 // second record 289 want = 2 + int(b.hdrsize) + nstk2 290 if i+want > len(b.data) { 291 // Can't fit in trailing fragment of slice. 292 // Skip over that and start over at beginning of slice. 293 nd -= len(b.data) - i 294 i = 0 295 } 296 return nd >= want 297 } 298 299 // write writes an entry to the profiling buffer b. 300 // The entry begins with a fixed hdr, which must have 301 // length b.hdrsize, followed by a variable-sized stack 302 // and a single tag pointer *tagPtr (or nil if tagPtr is nil). 303 // No write barriers allowed because this might be called from a signal handler. 304 func (b *profBuf) write(tagPtr *unsafe.Pointer, now int64, hdr []uint64, stk []uintptr) { 305 if b == nil { 306 return 307 } 308 if len(hdr) > int(b.hdrsize) { 309 throw("misuse of profBuf.write") 310 } 311 312 if hasOverflow := b.hasOverflow(); hasOverflow && b.canWriteTwoRecords(1, len(stk)) { 313 // Room for both an overflow record and the one being written. 314 // Write the overflow record if the reader hasn't gotten to it yet. 315 // Only racing against reader, not other writers. 316 count, time := b.takeOverflow() 317 if count > 0 { 318 var stk [1]uintptr 319 stk[0] = uintptr(count) 320 b.write(nil, int64(time), nil, stk[:]) 321 } 322 } else if hasOverflow || !b.canWriteRecord(len(stk)) { 323 // Pending overflow without room to write overflow and new records 324 // or no overflow but also no room for new record. 325 b.incrementOverflow(now) 326 b.wakeupExtra() 327 return 328 } 329 330 // There's room: write the record. 331 br := b.r.load() 332 bw := b.w.load() 333 334 // Profiling tag 335 // 336 // The tag is a pointer, but we can't run a write barrier here. 337 // We have interrupted the OS-level execution of gp, but the 338 // runtime still sees gp as executing. In effect, we are running 339 // in place of the real gp. Since gp is the only goroutine that 340 // can overwrite gp.labels, the value of gp.labels is stable during 341 // this signal handler: it will still be reachable from gp when 342 // we finish executing. If a GC is in progress right now, it must 343 // keep gp.labels alive, because gp.labels is reachable from gp. 344 // If gp were to overwrite gp.labels, the deletion barrier would 345 // still shade that pointer, which would preserve it for the 346 // in-progress GC, so all is well. Any future GC will see the 347 // value we copied when scanning b.tags (heap-allocated). 348 // We arrange that the store here is always overwriting a nil, 349 // so there is no need for a deletion barrier on b.tags[wt]. 350 wt := int(bw.tagCount() % uint32(len(b.tags))) 351 if tagPtr != nil { 352 *(*uintptr)(unsafe.Pointer(&b.tags[wt])) = uintptr(unsafe.Pointer(*tagPtr)) 353 } 354 355 // Main record. 356 // It has to fit in a contiguous section of the slice, so if it doesn't fit at the end, 357 // leave a rewind marker (0) and start over at the beginning of the slice. 358 wd := int(bw.dataCount() % uint32(len(b.data))) 359 nd := countSub(br.dataCount(), bw.dataCount()) + len(b.data) 360 skip := 0 361 if wd+2+int(b.hdrsize)+len(stk) > len(b.data) { 362 b.data[wd] = 0 363 skip = len(b.data) - wd 364 nd -= skip 365 wd = 0 366 } 367 data := b.data[wd:] 368 data[0] = uint64(2 + b.hdrsize + uintptr(len(stk))) // length 369 data[1] = uint64(now) // time stamp 370 // header, zero-padded 371 i := uintptr(copy(data[2:2+b.hdrsize], hdr)) 372 for ; i < b.hdrsize; i++ { 373 data[2+i] = 0 374 } 375 for i, pc := range stk { 376 data[2+b.hdrsize+uintptr(i)] = uint64(pc) 377 } 378 379 for { 380 // Commit write. 381 // Racing with reader setting flag bits in b.w, to avoid lost wakeups. 382 old := b.w.load() 383 new := old.addCountsAndClearFlags(skip+2+len(stk)+int(b.hdrsize), 1) 384 if !b.w.cas(old, new) { 385 continue 386 } 387 // If there was a reader, wake it up. 388 if old&profReaderSleeping != 0 { 389 notewakeup(&b.wait) 390 } 391 break 392 } 393 } 394 395 // close signals that there will be no more writes on the buffer. 396 // Once all the data has been read from the buffer, reads will return eof=true. 397 func (b *profBuf) close() { 398 if atomic.Load(&b.eof) > 0 { 399 throw("runtime: profBuf already closed") 400 } 401 atomic.Store(&b.eof, 1) 402 b.wakeupExtra() 403 } 404 405 // wakeupExtra must be called after setting one of the "extra" 406 // atomic fields b.overflow or b.eof. 407 // It records the change in b.w and wakes up the reader if needed. 408 func (b *profBuf) wakeupExtra() { 409 for { 410 old := b.w.load() 411 new := old | profWriteExtra 412 if !b.w.cas(old, new) { 413 continue 414 } 415 if old&profReaderSleeping != 0 { 416 notewakeup(&b.wait) 417 } 418 break 419 } 420 } 421 422 // profBufReadMode specifies whether to block when no data is available to read. 423 type profBufReadMode int 424 425 const ( 426 profBufBlocking profBufReadMode = iota 427 profBufNonBlocking 428 ) 429 430 var overflowTag [1]unsafe.Pointer // always nil 431 432 func (b *profBuf) read(mode profBufReadMode) (data []uint64, tags []unsafe.Pointer, eof bool) { 433 if b == nil { 434 return nil, nil, true 435 } 436 437 br := b.rNext 438 439 // Commit previous read, returning that part of the ring to the writer. 440 // First clear tags that have now been read, both to avoid holding 441 // up the memory they point at for longer than necessary 442 // and so that b.write can assume it is always overwriting 443 // nil tag entries (see comment in b.write). 444 rPrev := b.r.load() 445 if rPrev != br { 446 ntag := countSub(br.tagCount(), rPrev.tagCount()) 447 ti := int(rPrev.tagCount() % uint32(len(b.tags))) 448 for i := 0; i < ntag; i++ { 449 b.tags[ti] = nil 450 if ti++; ti == len(b.tags) { 451 ti = 0 452 } 453 } 454 b.r.store(br) 455 } 456 457 Read: 458 bw := b.w.load() 459 numData := countSub(bw.dataCount(), br.dataCount()) 460 if numData == 0 { 461 if b.hasOverflow() { 462 // No data to read, but there is overflow to report. 463 // Racing with writer flushing b.overflow into a real record. 464 count, time := b.takeOverflow() 465 if count == 0 { 466 // Lost the race, go around again. 467 goto Read 468 } 469 // Won the race, report overflow. 470 dst := b.overflowBuf 471 dst[0] = uint64(2 + b.hdrsize + 1) 472 dst[1] = uint64(time) 473 for i := uintptr(0); i < b.hdrsize; i++ { 474 dst[2+i] = 0 475 } 476 dst[2+b.hdrsize] = uint64(count) 477 return dst[:2+b.hdrsize+1], overflowTag[:1], false 478 } 479 if atomic.Load(&b.eof) > 0 { 480 // No data, no overflow, EOF set: done. 481 return nil, nil, true 482 } 483 if bw&profWriteExtra != 0 { 484 // Writer claims to have published extra information (overflow or eof). 485 // Attempt to clear notification and then check again. 486 // If we fail to clear the notification it means b.w changed, 487 // so we still need to check again. 488 b.w.cas(bw, bw&^profWriteExtra) 489 goto Read 490 } 491 492 // Nothing to read right now. 493 // Return or sleep according to mode. 494 if mode == profBufNonBlocking { 495 return nil, nil, false 496 } 497 if !b.w.cas(bw, bw|profReaderSleeping) { 498 goto Read 499 } 500 // Committed to sleeping. 501 notetsleepg(&b.wait, -1) 502 noteclear(&b.wait) 503 goto Read 504 } 505 data = b.data[br.dataCount()%uint32(len(b.data)):] 506 if len(data) > numData { 507 data = data[:numData] 508 } else { 509 numData -= len(data) // available in case of wraparound 510 } 511 skip := 0 512 if data[0] == 0 { 513 // Wraparound record. Go back to the beginning of the ring. 514 skip = len(data) 515 data = b.data 516 if len(data) > numData { 517 data = data[:numData] 518 } 519 } 520 521 ntag := countSub(bw.tagCount(), br.tagCount()) 522 if ntag == 0 { 523 throw("runtime: malformed profBuf buffer - tag and data out of sync") 524 } 525 tags = b.tags[br.tagCount()%uint32(len(b.tags)):] 526 if len(tags) > ntag { 527 tags = tags[:ntag] 528 } 529 530 // Count out whole data records until either data or tags is done. 531 // They are always in sync in the buffer, but due to an end-of-slice 532 // wraparound we might need to stop early and return the rest 533 // in the next call. 534 di := 0 535 ti := 0 536 for di < len(data) && data[di] != 0 && ti < len(tags) { 537 if uintptr(di)+uintptr(data[di]) > uintptr(len(data)) { 538 throw("runtime: malformed profBuf buffer - invalid size") 539 } 540 di += int(data[di]) 541 ti++ 542 } 543 544 // Remember how much we returned, to commit read on next call. 545 b.rNext = br.addCountsAndClearFlags(skip+di, ti) 546 547 if raceenabled { 548 // Match racereleasemerge in runtime_setProfLabel, 549 // so that the setting of the labels in runtime_setProfLabel 550 // is treated as happening before any use of the labels 551 // by our caller. The synchronization on labelSync itself is a fiction 552 // for the race detector. The actual synchronization is handled 553 // by the fact that the signal handler only reads from the current 554 // goroutine and uses atomics to write the updated queue indices, 555 // and then the read-out from the signal handler buffer uses 556 // atomics to read those queue indices. 557 raceacquire(unsafe.Pointer(&labelSync)) 558 } 559 560 return data[:di], tags[:ti], false 561 } 562