Source file src/cmd/go/internal/par/work.go

     1  // Copyright 2018 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package par implements parallel execution helpers.
     6  package par
     7  
     8  import (
     9  	"math/rand"
    10  	"sync"
    11  	"sync/atomic"
    12  )
    13  
    14  // Work manages a set of work items to be executed in parallel, at most once each.
    15  // The items in the set must all be valid map keys.
    16  type Work struct {
    17  	f       func(any) // function to run for each item
    18  	running int       // total number of runners
    19  
    20  	mu      sync.Mutex
    21  	added   map[any]bool // items added to set
    22  	todo    []any        // items yet to be run
    23  	wait    sync.Cond    // wait when todo is empty
    24  	waiting int          // number of runners waiting for todo
    25  }
    26  
    27  func (w *Work) init() {
    28  	if w.added == nil {
    29  		w.added = make(map[any]bool)
    30  	}
    31  }
    32  
    33  // Add adds item to the work set, if it hasn't already been added.
    34  func (w *Work) Add(item any) {
    35  	w.mu.Lock()
    36  	w.init()
    37  	if !w.added[item] {
    38  		w.added[item] = true
    39  		w.todo = append(w.todo, item)
    40  		if w.waiting > 0 {
    41  			w.wait.Signal()
    42  		}
    43  	}
    44  	w.mu.Unlock()
    45  }
    46  
    47  // Do runs f in parallel on items from the work set,
    48  // with at most n invocations of f running at a time.
    49  // It returns when everything added to the work set has been processed.
    50  // At least one item should have been added to the work set
    51  // before calling Do (or else Do returns immediately),
    52  // but it is allowed for f(item) to add new items to the set.
    53  // Do should only be used once on a given Work.
    54  func (w *Work) Do(n int, f func(item any)) {
    55  	if n < 1 {
    56  		panic("par.Work.Do: n < 1")
    57  	}
    58  	if w.running >= 1 {
    59  		panic("par.Work.Do: already called Do")
    60  	}
    61  
    62  	w.running = n
    63  	w.f = f
    64  	w.wait.L = &w.mu
    65  
    66  	for i := 0; i < n-1; i++ {
    67  		go w.runner()
    68  	}
    69  	w.runner()
    70  }
    71  
    72  // runner executes work in w until both nothing is left to do
    73  // and all the runners are waiting for work.
    74  // (Then all the runners return.)
    75  func (w *Work) runner() {
    76  	for {
    77  		// Wait for something to do.
    78  		w.mu.Lock()
    79  		for len(w.todo) == 0 {
    80  			w.waiting++
    81  			if w.waiting == w.running {
    82  				// All done.
    83  				w.wait.Broadcast()
    84  				w.mu.Unlock()
    85  				return
    86  			}
    87  			w.wait.Wait()
    88  			w.waiting--
    89  		}
    90  
    91  		// Pick something to do at random,
    92  		// to eliminate pathological contention
    93  		// in case items added at about the same time
    94  		// are most likely to contend.
    95  		i := rand.Intn(len(w.todo))
    96  		item := w.todo[i]
    97  		w.todo[i] = w.todo[len(w.todo)-1]
    98  		w.todo = w.todo[:len(w.todo)-1]
    99  		w.mu.Unlock()
   100  
   101  		w.f(item)
   102  	}
   103  }
   104  
   105  // Cache runs an action once per key and caches the result.
   106  type Cache struct {
   107  	m sync.Map
   108  }
   109  
   110  type cacheEntry struct {
   111  	done   uint32
   112  	mu     sync.Mutex
   113  	result any
   114  }
   115  
   116  // Do calls the function f if and only if Do is being called for the first time with this key.
   117  // No call to Do with a given key returns until the one call to f returns.
   118  // Do returns the value returned by the one call to f.
   119  func (c *Cache) Do(key any, f func() any) any {
   120  	entryIface, ok := c.m.Load(key)
   121  	if !ok {
   122  		entryIface, _ = c.m.LoadOrStore(key, new(cacheEntry))
   123  	}
   124  	e := entryIface.(*cacheEntry)
   125  	if atomic.LoadUint32(&e.done) == 0 {
   126  		e.mu.Lock()
   127  		if atomic.LoadUint32(&e.done) == 0 {
   128  			e.result = f()
   129  			atomic.StoreUint32(&e.done, 1)
   130  		}
   131  		e.mu.Unlock()
   132  	}
   133  	return e.result
   134  }
   135  
   136  // Get returns the cached result associated with key.
   137  // It returns nil if there is no such result.
   138  // If the result for key is being computed, Get does not wait for the computation to finish.
   139  func (c *Cache) Get(key any) any {
   140  	entryIface, ok := c.m.Load(key)
   141  	if !ok {
   142  		return nil
   143  	}
   144  	e := entryIface.(*cacheEntry)
   145  	if atomic.LoadUint32(&e.done) == 0 {
   146  		return nil
   147  	}
   148  	return e.result
   149  }
   150  
   151  // Clear removes all entries in the cache.
   152  //
   153  // Concurrent calls to Get may return old values. Concurrent calls to Do
   154  // may return old values or store results in entries that have been deleted.
   155  //
   156  // TODO(jayconrod): Delete this after the package cache clearing functions
   157  // in internal/load have been removed.
   158  func (c *Cache) Clear() {
   159  	c.m.Range(func(key, value any) bool {
   160  		c.m.Delete(key)
   161  		return true
   162  	})
   163  }
   164  
   165  // Delete removes an entry from the map. It is safe to call Delete for an
   166  // entry that does not exist. Delete will return quickly, even if the result
   167  // for a key is still being computed; the computation will finish, but the
   168  // result won't be accessible through the cache.
   169  //
   170  // TODO(jayconrod): Delete this after the package cache clearing functions
   171  // in internal/load have been removed.
   172  func (c *Cache) Delete(key any) {
   173  	c.m.Delete(key)
   174  }
   175  
   176  // DeleteIf calls pred for each key in the map. If pred returns true for a key,
   177  // DeleteIf removes the corresponding entry. If the result for a key is
   178  // still being computed, DeleteIf will remove the entry without waiting for
   179  // the computation to finish. The result won't be accessible through the cache.
   180  //
   181  // TODO(jayconrod): Delete this after the package cache clearing functions
   182  // in internal/load have been removed.
   183  func (c *Cache) DeleteIf(pred func(key any) bool) {
   184  	c.m.Range(func(key, _ any) bool {
   185  		if pred(key) {
   186  			c.Delete(key)
   187  		}
   188  		return true
   189  	})
   190  }
   191  

View as plain text