Source file src/internal/fuzz/worker.go

     1  // Copyright 2020 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package fuzz
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"crypto/sha256"
    11  	"encoding/json"
    12  	"errors"
    13  	"fmt"
    14  	"io"
    15  	"io/ioutil"
    16  	"os"
    17  	"os/exec"
    18  	"reflect"
    19  	"runtime"
    20  	"sync"
    21  	"time"
    22  )
    23  
    24  const (
    25  	// workerFuzzDuration is the amount of time a worker can spend testing random
    26  	// variations of an input given by the coordinator.
    27  	workerFuzzDuration = 100 * time.Millisecond
    28  
    29  	// workerTimeoutDuration is the amount of time a worker can go without
    30  	// responding to the coordinator before being stopped.
    31  	workerTimeoutDuration = 1 * time.Second
    32  
    33  	// workerExitCode is used as an exit code by fuzz worker processes after an internal error.
    34  	// This distinguishes internal errors from uncontrolled panics and other crashes.
    35  	// Keep in sync with internal/fuzz.workerExitCode.
    36  	workerExitCode = 70
    37  
    38  	// workerSharedMemSize is the maximum size of the shared memory file used to
    39  	// communicate with workers. This limits the size of fuzz inputs.
    40  	workerSharedMemSize = 100 << 20 // 100 MB
    41  )
    42  
    43  // worker manages a worker process running a test binary. The worker object
    44  // exists only in the coordinator (the process started by 'go test -fuzz').
    45  // workerClient is used by the coordinator to send RPCs to the worker process,
    46  // which handles them with workerServer.
    47  type worker struct {
    48  	dir     string   // working directory, same as package directory
    49  	binPath string   // path to test executable
    50  	args    []string // arguments for test executable
    51  	env     []string // environment for test executable
    52  
    53  	coordinator *coordinator
    54  
    55  	memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
    56  
    57  	cmd         *exec.Cmd     // current worker process
    58  	client      *workerClient // used to communicate with worker process
    59  	waitErr     error         // last error returned by wait, set before termC is closed.
    60  	interrupted bool          // true after stop interrupts a running worker.
    61  	termC       chan struct{} // closed by wait when worker process terminates
    62  }
    63  
    64  func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) {
    65  	mem, err := sharedMemTempFile(workerSharedMemSize)
    66  	if err != nil {
    67  		return nil, err
    68  	}
    69  	memMu := make(chan *sharedMem, 1)
    70  	memMu <- mem
    71  	return &worker{
    72  		dir:         dir,
    73  		binPath:     binPath,
    74  		args:        args,
    75  		env:         env[:len(env):len(env)], // copy on append to ensure workers don't overwrite each other.
    76  		coordinator: c,
    77  		memMu:       memMu,
    78  	}, nil
    79  }
    80  
    81  // cleanup releases persistent resources associated with the worker.
    82  func (w *worker) cleanup() error {
    83  	mem := <-w.memMu
    84  	if mem == nil {
    85  		return nil
    86  	}
    87  	close(w.memMu)
    88  	return mem.Close()
    89  }
    90  
    91  // coordinate runs the test binary to perform fuzzing.
    92  //
    93  // coordinate loops until ctx is cancelled or a fatal error is encountered.
    94  // If a test process terminates unexpectedly while fuzzing, coordinate will
    95  // attempt to restart and continue unless the termination can be attributed
    96  // to an interruption (from a timer or the user).
    97  //
    98  // While looping, coordinate receives inputs from the coordinator, passes
    99  // those inputs to the worker process, then passes the results back to
   100  // the coordinator.
   101  func (w *worker) coordinate(ctx context.Context) error {
   102  	// Main event loop.
   103  	for {
   104  		// Start or restart the worker if it's not running.
   105  		if !w.isRunning() {
   106  			if err := w.startAndPing(ctx); err != nil {
   107  				return err
   108  			}
   109  		}
   110  
   111  		select {
   112  		case <-ctx.Done():
   113  			// Worker was told to stop.
   114  			err := w.stop()
   115  			if err != nil && !w.interrupted && !isInterruptError(err) {
   116  				return err
   117  			}
   118  			return ctx.Err()
   119  
   120  		case <-w.termC:
   121  			// Worker process terminated unexpectedly while waiting for input.
   122  			err := w.stop()
   123  			if w.interrupted {
   124  				panic("worker interrupted after unexpected termination")
   125  			}
   126  			if err == nil || isInterruptError(err) {
   127  				// Worker stopped, either by exiting with status 0 or after being
   128  				// interrupted with a signal that was not sent by the coordinator.
   129  				//
   130  				// When the user presses ^C, on POSIX platforms, SIGINT is delivered to
   131  				// all processes in the group concurrently, and the worker may see it
   132  				// before the coordinator. The worker should exit 0 gracefully (in
   133  				// theory).
   134  				//
   135  				// This condition is probably intended by the user, so suppress
   136  				// the error.
   137  				return nil
   138  			}
   139  			if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
   140  				// Worker exited with a code indicating F.Fuzz was not called correctly,
   141  				// for example, F.Fail was called first.
   142  				return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
   143  			}
   144  			// Worker exited non-zero or was terminated by a non-interrupt
   145  			// signal (for example, SIGSEGV) while fuzzing.
   146  			return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err)
   147  			// TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
   148  
   149  		case input := <-w.coordinator.inputC:
   150  			// Received input from coordinator.
   151  			args := fuzzArgs{
   152  				Limit:        input.limit,
   153  				Timeout:      input.timeout,
   154  				Warmup:       input.warmup,
   155  				CoverageData: input.coverageData,
   156  			}
   157  			entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args)
   158  			canMinimize := true
   159  			if err != nil {
   160  				// Error communicating with worker.
   161  				w.stop()
   162  				if ctx.Err() != nil {
   163  					// Timeout or interruption.
   164  					return ctx.Err()
   165  				}
   166  				if w.interrupted {
   167  					// Communication error before we stopped the worker.
   168  					// Report an error, but don't record a crasher.
   169  					return fmt.Errorf("communicating with fuzzing process: %v", err)
   170  				}
   171  				if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) {
   172  					// Worker terminated by a signal that probably wasn't caused by a
   173  					// specific input to the fuzz function. For example, on Linux,
   174  					// the kernel (OOM killer) may send SIGKILL to a process using a lot
   175  					// of memory. Or the shell might send SIGHUP when the terminal
   176  					// is closed. Don't record a crasher.
   177  					return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr)
   178  				}
   179  				if isInternalError {
   180  					// An internal error occurred which shouldn't be considered
   181  					// a crash.
   182  					return err
   183  				}
   184  				// Unexpected termination. Set error message and fall through.
   185  				// We'll restart the worker on the next iteration.
   186  				// Don't attempt to minimize this since it crashed the worker.
   187  				resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr)
   188  				canMinimize = false
   189  			}
   190  			result := fuzzResult{
   191  				limit:         input.limit,
   192  				count:         resp.Count,
   193  				totalDuration: resp.TotalDuration,
   194  				entryDuration: resp.InterestingDuration,
   195  				entry:         entry,
   196  				crasherMsg:    resp.Err,
   197  				coverageData:  resp.CoverageData,
   198  				canMinimize:   canMinimize,
   199  			}
   200  			w.coordinator.resultC <- result
   201  
   202  		case input := <-w.coordinator.minimizeC:
   203  			// Received input to minimize from coordinator.
   204  			result, err := w.minimize(ctx, input)
   205  			if err != nil {
   206  				// Error minimizing. Send back the original input. If it didn't cause
   207  				// an error before, report it as causing an error now.
   208  				// TODO: double-check this is handled correctly when
   209  				// implementing -keepfuzzing.
   210  				result = fuzzResult{
   211  					entry:       input.entry,
   212  					crasherMsg:  input.crasherMsg,
   213  					canMinimize: false,
   214  					limit:       input.limit,
   215  				}
   216  				if result.crasherMsg == "" {
   217  					result.crasherMsg = err.Error()
   218  				}
   219  			}
   220  			w.coordinator.resultC <- result
   221  		}
   222  	}
   223  }
   224  
   225  // minimize tells a worker process to attempt to find a smaller value that
   226  // either causes an error (if we started minimizing because we found an input
   227  // that causes an error) or preserves new coverage (if we started minimizing
   228  // because we found an input that expands coverage).
   229  func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) {
   230  	if w.coordinator.opts.MinimizeTimeout != 0 {
   231  		var cancel func()
   232  		ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout)
   233  		defer cancel()
   234  	}
   235  
   236  	args := minimizeArgs{
   237  		Limit:        input.limit,
   238  		Timeout:      input.timeout,
   239  		KeepCoverage: input.keepCoverage,
   240  	}
   241  	entry, resp, err := w.client.minimize(ctx, input.entry, args)
   242  	if err != nil {
   243  		// Error communicating with worker.
   244  		w.stop()
   245  		if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) {
   246  			// Worker was interrupted, possibly by the user pressing ^C.
   247  			// Normally, workers can handle interrupts and timeouts gracefully and
   248  			// will return without error. An error here indicates the worker
   249  			// may not have been in a good state, but the error won't be meaningful
   250  			// to the user. Just return the original crasher without logging anything.
   251  			return fuzzResult{
   252  				entry:        input.entry,
   253  				crasherMsg:   input.crasherMsg,
   254  				coverageData: input.keepCoverage,
   255  				canMinimize:  false,
   256  				limit:        input.limit,
   257  			}, nil
   258  		}
   259  		return fuzzResult{
   260  			entry:         entry,
   261  			crasherMsg:    fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err),
   262  			canMinimize:   false,
   263  			limit:         input.limit,
   264  			count:         resp.Count,
   265  			totalDuration: resp.Duration,
   266  		}, nil
   267  	}
   268  
   269  	if input.crasherMsg != "" && resp.Err == "" {
   270  		return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce")
   271  	}
   272  
   273  	return fuzzResult{
   274  		entry:         entry,
   275  		crasherMsg:    resp.Err,
   276  		coverageData:  resp.CoverageData,
   277  		canMinimize:   false,
   278  		limit:         input.limit,
   279  		count:         resp.Count,
   280  		totalDuration: resp.Duration,
   281  	}, nil
   282  }
   283  
   284  func (w *worker) isRunning() bool {
   285  	return w.cmd != nil
   286  }
   287  
   288  // startAndPing starts the worker process and sends it a message to make sure it
   289  // can communicate.
   290  //
   291  // startAndPing returns an error if any part of this didn't work, including if
   292  // the context is expired or the worker process was interrupted before it
   293  // responded. Errors that happen after start but before the ping response
   294  // likely indicate that the worker did not call F.Fuzz or called F.Fail first.
   295  // We don't record crashers for these errors.
   296  func (w *worker) startAndPing(ctx context.Context) error {
   297  	if ctx.Err() != nil {
   298  		return ctx.Err()
   299  	}
   300  	if err := w.start(); err != nil {
   301  		return err
   302  	}
   303  	if err := w.client.ping(ctx); err != nil {
   304  		w.stop()
   305  		if ctx.Err() != nil {
   306  			return ctx.Err()
   307  		}
   308  		if isInterruptError(err) {
   309  			// User may have pressed ^C before worker responded.
   310  			return err
   311  		}
   312  		// TODO: record and return stderr.
   313  		return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
   314  	}
   315  	return nil
   316  }
   317  
   318  // start runs a new worker process.
   319  //
   320  // If the process couldn't be started, start returns an error. Start won't
   321  // return later termination errors from the process if they occur.
   322  //
   323  // If the process starts successfully, start returns nil. stop must be called
   324  // once later to clean up, even if the process terminates on its own.
   325  //
   326  // When the process terminates, w.waitErr is set to the error (if any), and
   327  // w.termC is closed.
   328  func (w *worker) start() (err error) {
   329  	if w.isRunning() {
   330  		panic("worker already started")
   331  	}
   332  	w.waitErr = nil
   333  	w.interrupted = false
   334  	w.termC = nil
   335  
   336  	cmd := exec.Command(w.binPath, w.args...)
   337  	cmd.Dir = w.dir
   338  	cmd.Env = w.env[:len(w.env):len(w.env)] // copy on append to ensure workers don't overwrite each other.
   339  
   340  	// Create the "fuzz_in" and "fuzz_out" pipes so we can communicate with
   341  	// the worker. We don't use stdin and stdout, since the test binary may
   342  	// do something else with those.
   343  	//
   344  	// Each pipe has a reader and a writer. The coordinator writes to fuzzInW
   345  	// and reads from fuzzOutR. The worker inherits fuzzInR and fuzzOutW.
   346  	// The coordinator closes fuzzInR and fuzzOutW after starting the worker,
   347  	// since we have no further need of them.
   348  	fuzzInR, fuzzInW, err := os.Pipe()
   349  	if err != nil {
   350  		return err
   351  	}
   352  	defer fuzzInR.Close()
   353  	fuzzOutR, fuzzOutW, err := os.Pipe()
   354  	if err != nil {
   355  		fuzzInW.Close()
   356  		return err
   357  	}
   358  	defer fuzzOutW.Close()
   359  	setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu})
   360  
   361  	// Start the worker process.
   362  	if err := cmd.Start(); err != nil {
   363  		fuzzInW.Close()
   364  		fuzzOutR.Close()
   365  		return err
   366  	}
   367  
   368  	// Worker started successfully.
   369  	// After this, w.client owns fuzzInW and fuzzOutR, so w.client.Close must be
   370  	// called later by stop.
   371  	w.cmd = cmd
   372  	w.termC = make(chan struct{})
   373  	comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu}
   374  	m := newMutator()
   375  	w.client = newWorkerClient(comm, m)
   376  
   377  	go func() {
   378  		w.waitErr = w.cmd.Wait()
   379  		close(w.termC)
   380  	}()
   381  
   382  	return nil
   383  }
   384  
   385  // stop tells the worker process to exit by closing w.client, then blocks until
   386  // it terminates. If the worker doesn't terminate after a short time, stop
   387  // signals it with os.Interrupt (where supported), then os.Kill.
   388  //
   389  // stop returns the error the process terminated with, if any (same as
   390  // w.waitErr).
   391  //
   392  // stop must be called at least once after start returns successfully, even if
   393  // the worker process terminates unexpectedly.
   394  func (w *worker) stop() error {
   395  	if w.termC == nil {
   396  		panic("worker was not started successfully")
   397  	}
   398  	select {
   399  	case <-w.termC:
   400  		// Worker already terminated.
   401  		if w.client == nil {
   402  			// stop already called.
   403  			return w.waitErr
   404  		}
   405  		// Possible unexpected termination.
   406  		w.client.Close()
   407  		w.cmd = nil
   408  		w.client = nil
   409  		return w.waitErr
   410  	default:
   411  		// Worker still running.
   412  	}
   413  
   414  	// Tell the worker to stop by closing fuzz_in. It won't actually stop until it
   415  	// finishes with earlier calls.
   416  	closeC := make(chan struct{})
   417  	go func() {
   418  		w.client.Close()
   419  		close(closeC)
   420  	}()
   421  
   422  	sig := os.Interrupt
   423  	if runtime.GOOS == "windows" {
   424  		// Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on
   425  		// Windows; using it with os.Process.Signal will return an error.”
   426  		// Fall back to Kill instead.
   427  		sig = os.Kill
   428  	}
   429  
   430  	t := time.NewTimer(workerTimeoutDuration)
   431  	for {
   432  		select {
   433  		case <-w.termC:
   434  			// Worker terminated.
   435  			t.Stop()
   436  			<-closeC
   437  			w.cmd = nil
   438  			w.client = nil
   439  			return w.waitErr
   440  
   441  		case <-t.C:
   442  			// Timer fired before worker terminated.
   443  			w.interrupted = true
   444  			switch sig {
   445  			case os.Interrupt:
   446  				// Try to stop the worker with SIGINT and wait a little longer.
   447  				w.cmd.Process.Signal(sig)
   448  				sig = os.Kill
   449  				t.Reset(workerTimeoutDuration)
   450  
   451  			case os.Kill:
   452  				// Try to stop the worker with SIGKILL and keep waiting.
   453  				w.cmd.Process.Signal(sig)
   454  				sig = nil
   455  				t.Reset(workerTimeoutDuration)
   456  
   457  			case nil:
   458  				// Still waiting. Print a message to let the user know why.
   459  				fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n")
   460  			}
   461  		}
   462  	}
   463  }
   464  
   465  // RunFuzzWorker is called in a worker process to communicate with the
   466  // coordinator process in order to fuzz random inputs. RunFuzzWorker loops
   467  // until the coordinator tells it to stop.
   468  //
   469  // fn is a wrapper on the fuzz function. It may return an error to indicate
   470  // a given input "crashed". The coordinator will also record a crasher if
   471  // the function times out or terminates the process.
   472  //
   473  // RunFuzzWorker returns an error if it could not communicate with the
   474  // coordinator process.
   475  func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error {
   476  	comm, err := getWorkerComm()
   477  	if err != nil {
   478  		return err
   479  	}
   480  	srv := &workerServer{
   481  		workerComm: comm,
   482  		fuzzFn: func(e CorpusEntry) (time.Duration, error) {
   483  			timer := time.AfterFunc(10*time.Second, func() {
   484  				panic("deadlocked!") // this error message won't be printed
   485  			})
   486  			defer timer.Stop()
   487  			start := time.Now()
   488  			err := fn(e)
   489  			return time.Since(start), err
   490  		},
   491  		m: newMutator(),
   492  	}
   493  	return srv.serve(ctx)
   494  }
   495  
   496  // call is serialized and sent from the coordinator on fuzz_in. It acts as
   497  // a minimalist RPC mechanism. Exactly one of its fields must be set to indicate
   498  // which method to call.
   499  type call struct {
   500  	Ping     *pingArgs
   501  	Fuzz     *fuzzArgs
   502  	Minimize *minimizeArgs
   503  }
   504  
   505  // minimizeArgs contains arguments to workerServer.minimize. The value to
   506  // minimize is already in shared memory.
   507  type minimizeArgs struct {
   508  	// Timeout is the time to spend minimizing. This may include time to start up,
   509  	// especially if the input causes the worker process to terminated, requiring
   510  	// repeated restarts.
   511  	Timeout time.Duration
   512  
   513  	// Limit is the maximum number of values to test, without spending more time
   514  	// than Duration. 0 indicates no limit.
   515  	Limit int64
   516  
   517  	// KeepCoverage is a set of coverage counters the worker should attempt to
   518  	// keep in minimized values. When provided, the worker will reject inputs that
   519  	// don't cause at least one of these bits to be set.
   520  	KeepCoverage []byte
   521  
   522  	// Index is the index of the fuzz target parameter to be minimized.
   523  	Index int
   524  }
   525  
   526  // minimizeResponse contains results from workerServer.minimize.
   527  type minimizeResponse struct {
   528  	// WroteToMem is true if the worker found a smaller input and wrote it to
   529  	// shared memory. If minimizeArgs.KeepCoverage was set, the minimized input
   530  	// preserved at least one coverage bit and did not cause an error.
   531  	// Otherwise, the minimized input caused some error, recorded in Err.
   532  	WroteToMem bool
   533  
   534  	// Err is the error string caused by the value in shared memory, if any.
   535  	Err string
   536  
   537  	// CoverageData is the set of coverage bits activated by the minimized value
   538  	// in shared memory. When set, it contains at least one bit from KeepCoverage.
   539  	// CoverageData will be nil if Err is set or if minimization failed.
   540  	CoverageData []byte
   541  
   542  	// Duration is the time spent minimizing, not including starting or cleaning up.
   543  	Duration time.Duration
   544  
   545  	// Count is the number of values tested.
   546  	Count int64
   547  }
   548  
   549  // fuzzArgs contains arguments to workerServer.fuzz. The value to fuzz is
   550  // passed in shared memory.
   551  type fuzzArgs struct {
   552  	// Timeout is the time to spend fuzzing, not including starting or
   553  	// cleaning up.
   554  	Timeout time.Duration
   555  
   556  	// Limit is the maximum number of values to test, without spending more time
   557  	// than Duration. 0 indicates no limit.
   558  	Limit int64
   559  
   560  	// Warmup indicates whether this is part of a warmup run, meaning that
   561  	// fuzzing should not occur. If coverageEnabled is true, then coverage data
   562  	// should be reported.
   563  	Warmup bool
   564  
   565  	// CoverageData is the coverage data. If set, the worker should update its
   566  	// local coverage data prior to fuzzing.
   567  	CoverageData []byte
   568  }
   569  
   570  // fuzzResponse contains results from workerServer.fuzz.
   571  type fuzzResponse struct {
   572  	// Duration is the time spent fuzzing, not including starting or cleaning up.
   573  	TotalDuration       time.Duration
   574  	InterestingDuration time.Duration
   575  
   576  	// Count is the number of values tested.
   577  	Count int64
   578  
   579  	// CoverageData is set if the value in shared memory expands coverage
   580  	// and therefore may be interesting to the coordinator.
   581  	CoverageData []byte
   582  
   583  	// Err is the error string caused by the value in shared memory, which is
   584  	// non-empty if the value in shared memory caused a crash.
   585  	Err string
   586  
   587  	// InternalErr is the error string caused by an internal error in the
   588  	// worker. This shouldn't be considered a crasher.
   589  	InternalErr string
   590  }
   591  
   592  // pingArgs contains arguments to workerServer.ping.
   593  type pingArgs struct{}
   594  
   595  // pingResponse contains results from workerServer.ping.
   596  type pingResponse struct{}
   597  
   598  // workerComm holds pipes and shared memory used for communication
   599  // between the coordinator process (client) and a worker process (server).
   600  // These values are unique to each worker; they are shared only with the
   601  // coordinator, not with other workers.
   602  //
   603  // Access to shared memory is synchronized implicitly over the RPC protocol
   604  // implemented in workerServer and workerClient. During a call, the client
   605  // (worker) has exclusive access to shared memory; at other times, the server
   606  // (coordinator) has exclusive access.
   607  type workerComm struct {
   608  	fuzzIn, fuzzOut *os.File
   609  	memMu           chan *sharedMem // mutex guarding shared memory
   610  }
   611  
   612  // workerServer is a minimalist RPC server, run by fuzz worker processes.
   613  // It allows the coordinator process (using workerClient) to call methods in a
   614  // worker process. This system allows the coordinator to run multiple worker
   615  // processes in parallel and to collect inputs that caused crashes from shared
   616  // memory after a worker process terminates unexpectedly.
   617  type workerServer struct {
   618  	workerComm
   619  	m *mutator
   620  
   621  	// coverageMask is the local coverage data for the worker. It is
   622  	// periodically updated to reflect the data in the coordinator when new
   623  	// coverage is found.
   624  	coverageMask []byte
   625  
   626  	// fuzzFn runs the worker's fuzz target on the given input and returns an
   627  	// error if it finds a crasher (the process may also exit or crash), and the
   628  	// time it took to run the input. It sets a deadline of 10 seconds, at which
   629  	// point it will panic with the assumption that the process is hanging or
   630  	// deadlocked.
   631  	fuzzFn func(CorpusEntry) (time.Duration, error)
   632  }
   633  
   634  // serve reads serialized RPC messages on fuzzIn. When serve receives a message,
   635  // it calls the corresponding method, then sends the serialized result back
   636  // on fuzzOut.
   637  //
   638  // serve handles RPC calls synchronously; it will not attempt to read a message
   639  // until the previous call has finished.
   640  //
   641  // serve returns errors that occurred when communicating over pipes. serve
   642  // does not return errors from method calls; those are passed through serialized
   643  // responses.
   644  func (ws *workerServer) serve(ctx context.Context) error {
   645  	enc := json.NewEncoder(ws.fuzzOut)
   646  	dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
   647  	for {
   648  		var c call
   649  		if err := dec.Decode(&c); err != nil {
   650  			if err == io.EOF || err == ctx.Err() {
   651  				return nil
   652  			} else {
   653  				return err
   654  			}
   655  		}
   656  
   657  		var resp any
   658  		switch {
   659  		case c.Fuzz != nil:
   660  			resp = ws.fuzz(ctx, *c.Fuzz)
   661  		case c.Minimize != nil:
   662  			resp = ws.minimize(ctx, *c.Minimize)
   663  		case c.Ping != nil:
   664  			resp = ws.ping(ctx, *c.Ping)
   665  		default:
   666  			return errors.New("no arguments provided for any call")
   667  		}
   668  
   669  		if err := enc.Encode(resp); err != nil {
   670  			return err
   671  		}
   672  	}
   673  }
   674  
   675  // chainedMutations is how many mutations are applied before the worker
   676  // resets the input to it's original state.
   677  // NOTE: this number was picked without much thought. It is low enough that
   678  // it seems to create a significant diversity in mutated inputs. We may want
   679  // to consider looking into this more closely once we have a proper performance
   680  // testing framework. Another option is to randomly pick the number of chained
   681  // mutations on each invocation of the workerServer.fuzz method (this appears to
   682  // be what libFuzzer does, although there seems to be no documentation which
   683  // explains why this choice was made.)
   684  const chainedMutations = 5
   685  
   686  // fuzz runs the test function on random variations of the input value in shared
   687  // memory for a limited duration or number of iterations.
   688  //
   689  // fuzz returns early if it finds an input that crashes the fuzz function (with
   690  // fuzzResponse.Err set) or an input that expands coverage (with
   691  // fuzzResponse.InterestingDuration set).
   692  //
   693  // fuzz does not modify the input in shared memory. Instead, it saves the
   694  // initial PRNG state in shared memory and increments a counter in shared
   695  // memory before each call to the test function. The caller may reconstruct
   696  // the crashing input with this information, since the PRNG is deterministic.
   697  func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) {
   698  	if args.CoverageData != nil {
   699  		if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) {
   700  			resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask))
   701  			return resp
   702  		}
   703  		ws.coverageMask = args.CoverageData
   704  	}
   705  	start := time.Now()
   706  	defer func() { resp.TotalDuration = time.Since(start) }()
   707  
   708  	if args.Timeout != 0 {
   709  		var cancel func()
   710  		ctx, cancel = context.WithTimeout(ctx, args.Timeout)
   711  		defer cancel()
   712  	}
   713  	mem := <-ws.memMu
   714  	ws.m.r.save(&mem.header().randState, &mem.header().randInc)
   715  	defer func() {
   716  		resp.Count = mem.header().count
   717  		ws.memMu <- mem
   718  	}()
   719  	if args.Limit > 0 && mem.header().count >= args.Limit {
   720  		resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit)
   721  		return resp
   722  	}
   723  
   724  	originalVals, err := unmarshalCorpusFile(mem.valueCopy())
   725  	if err != nil {
   726  		resp.InternalErr = err.Error()
   727  		return resp
   728  	}
   729  	vals := make([]any, len(originalVals))
   730  	copy(vals, originalVals)
   731  
   732  	shouldStop := func() bool {
   733  		return args.Limit > 0 && mem.header().count >= args.Limit
   734  	}
   735  	fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) {
   736  		mem.header().count++
   737  		var err error
   738  		dur, err = ws.fuzzFn(entry)
   739  		if err != nil {
   740  			errMsg = err.Error()
   741  			if errMsg == "" {
   742  				errMsg = "fuzz function failed with no input"
   743  			}
   744  			return dur, nil, errMsg
   745  		}
   746  		if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 {
   747  			return dur, coverageSnapshot, ""
   748  		}
   749  		return dur, nil, ""
   750  	}
   751  
   752  	if args.Warmup {
   753  		dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals})
   754  		if errMsg != "" {
   755  			resp.Err = errMsg
   756  			return resp
   757  		}
   758  		resp.InterestingDuration = dur
   759  		if coverageEnabled {
   760  			resp.CoverageData = coverageSnapshot
   761  		}
   762  		return resp
   763  	}
   764  
   765  	for {
   766  		select {
   767  		case <-ctx.Done():
   768  			return resp
   769  		default:
   770  			if mem.header().count%chainedMutations == 0 {
   771  				copy(vals, originalVals)
   772  				ws.m.r.save(&mem.header().randState, &mem.header().randInc)
   773  			}
   774  			ws.m.mutate(vals, cap(mem.valueRef()))
   775  
   776  			entry := CorpusEntry{Values: vals}
   777  			dur, cov, errMsg := fuzzOnce(entry)
   778  			if errMsg != "" {
   779  				resp.Err = errMsg
   780  				return resp
   781  			}
   782  			if cov != nil {
   783  				resp.CoverageData = cov
   784  				resp.InterestingDuration = dur
   785  				return resp
   786  			}
   787  			if shouldStop() {
   788  				return resp
   789  			}
   790  		}
   791  	}
   792  }
   793  
   794  func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) {
   795  	start := time.Now()
   796  	defer func() { resp.Duration = time.Now().Sub(start) }()
   797  	mem := <-ws.memMu
   798  	defer func() { ws.memMu <- mem }()
   799  	vals, err := unmarshalCorpusFile(mem.valueCopy())
   800  	if err != nil {
   801  		panic(err)
   802  	}
   803  	inpHash := sha256.Sum256(mem.valueCopy())
   804  	if args.Timeout != 0 {
   805  		var cancel func()
   806  		ctx, cancel = context.WithTimeout(ctx, args.Timeout)
   807  		defer cancel()
   808  	}
   809  
   810  	// Minimize the values in vals, then write to shared memory. We only write
   811  	// to shared memory after completing minimization.
   812  	success, err := ws.minimizeInput(ctx, vals, mem, args)
   813  	if success {
   814  		writeToMem(vals, mem)
   815  		outHash := sha256.Sum256(mem.valueCopy())
   816  		mem.header().rawInMem = false
   817  		resp.WroteToMem = true
   818  		if err != nil {
   819  			resp.Err = err.Error()
   820  		} else {
   821  			// If the values didn't change during minimization then coverageSnapshot is likely
   822  			// a dirty snapshot which represents the very last step of minimization, not the
   823  			// coverage for the initial input. In that case just return the coverage we were
   824  			// given initially, since it more accurately represents the coverage map for the
   825  			// input we are returning.
   826  			if outHash != inpHash {
   827  				resp.CoverageData = coverageSnapshot
   828  			} else {
   829  				resp.CoverageData = args.KeepCoverage
   830  			}
   831  		}
   832  	}
   833  	return resp
   834  }
   835  
   836  // minimizeInput applies a series of minimizing transformations on the provided
   837  // vals, ensuring that each minimization still causes an error, or keeps
   838  // coverage, in fuzzFn. It uses the context to determine how long to run,
   839  // stopping once closed. It returns a bool indicating whether minimization was
   840  // successful and an error if one was found.
   841  func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) {
   842  	keepCoverage := args.KeepCoverage
   843  	memBytes := mem.valueRef()
   844  	bPtr := &memBytes
   845  	count := &mem.header().count
   846  	shouldStop := func() bool {
   847  		return ctx.Err() != nil ||
   848  			(args.Limit > 0 && *count >= args.Limit)
   849  	}
   850  	if shouldStop() {
   851  		return false, nil
   852  	}
   853  
   854  	// Check that the original value preserves coverage or causes an error.
   855  	// If not, then whatever caused us to think the value was interesting may
   856  	// have been a flake, and we can't minimize it.
   857  	*count++
   858  	_, retErr = ws.fuzzFn(CorpusEntry{Values: vals})
   859  	if keepCoverage != nil {
   860  		if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil {
   861  			return false, nil
   862  		}
   863  	} else if retErr == nil {
   864  		return false, nil
   865  	}
   866  	mem.header().rawInMem = true
   867  
   868  	// tryMinimized runs the fuzz function with candidate replacing the value
   869  	// at index valI. tryMinimized returns whether the input with candidate is
   870  	// interesting for the same reason as the original input: it returns
   871  	// an error if one was expected, or it preserves coverage.
   872  	tryMinimized := func(candidate []byte) bool {
   873  		prev := vals[args.Index]
   874  		switch prev.(type) {
   875  		case []byte:
   876  			vals[args.Index] = candidate
   877  		case string:
   878  			vals[args.Index] = string(candidate)
   879  		default:
   880  			panic("impossible")
   881  		}
   882  		copy(*bPtr, candidate)
   883  		*bPtr = (*bPtr)[:len(candidate)]
   884  		mem.setValueLen(len(candidate))
   885  		*count++
   886  		_, err := ws.fuzzFn(CorpusEntry{Values: vals})
   887  		if err != nil {
   888  			retErr = err
   889  			if keepCoverage != nil {
   890  				// Now that we've found a crash, that's more important than any
   891  				// minimization of interesting inputs that was being done. Clear out
   892  				// keepCoverage to only minimize the crash going forward.
   893  				keepCoverage = nil
   894  			}
   895  			return true
   896  		}
   897  		// Minimization should preserve coverage bits.
   898  		if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) {
   899  			return true
   900  		}
   901  		vals[args.Index] = prev
   902  		return false
   903  	}
   904  	switch v := vals[args.Index].(type) {
   905  	case string:
   906  		minimizeBytes([]byte(v), tryMinimized, shouldStop)
   907  	case []byte:
   908  		minimizeBytes(v, tryMinimized, shouldStop)
   909  	default:
   910  		panic("impossible")
   911  	}
   912  	return true, retErr
   913  }
   914  
   915  func writeToMem(vals []any, mem *sharedMem) {
   916  	b := marshalCorpusFile(vals...)
   917  	mem.setValue(b)
   918  }
   919  
   920  // ping does nothing. The coordinator calls this method to ensure the worker
   921  // has called F.Fuzz and can communicate.
   922  func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse {
   923  	return pingResponse{}
   924  }
   925  
   926  // workerClient is a minimalist RPC client. The coordinator process uses a
   927  // workerClient to call methods in each worker process (handled by
   928  // workerServer).
   929  type workerClient struct {
   930  	workerComm
   931  	m *mutator
   932  
   933  	// mu is the mutex protecting the workerComm.fuzzIn pipe. This must be
   934  	// locked before making calls to the workerServer. It prevents
   935  	// workerClient.Close from closing fuzzIn while workerClient methods are
   936  	// writing to it concurrently, and prevents multiple callers from writing to
   937  	// fuzzIn concurrently.
   938  	mu sync.Mutex
   939  }
   940  
   941  func newWorkerClient(comm workerComm, m *mutator) *workerClient {
   942  	return &workerClient{workerComm: comm, m: m}
   943  }
   944  
   945  // Close shuts down the connection to the RPC server (the worker process) by
   946  // closing fuzz_in. Close drains fuzz_out (avoiding a SIGPIPE in the worker),
   947  // and closes it after the worker process closes the other end.
   948  func (wc *workerClient) Close() error {
   949  	wc.mu.Lock()
   950  	defer wc.mu.Unlock()
   951  
   952  	// Close fuzzIn. This signals to the server that there are no more calls,
   953  	// and it should exit.
   954  	if err := wc.fuzzIn.Close(); err != nil {
   955  		wc.fuzzOut.Close()
   956  		return err
   957  	}
   958  
   959  	// Drain fuzzOut and close it. When the server exits, the kernel will close
   960  	// its end of fuzzOut, and we'll get EOF.
   961  	if _, err := io.Copy(ioutil.Discard, wc.fuzzOut); err != nil {
   962  		wc.fuzzOut.Close()
   963  		return err
   964  	}
   965  	return wc.fuzzOut.Close()
   966  }
   967  
   968  // errSharedMemClosed is returned by workerClient methods that cannot access
   969  // shared memory because it was closed and unmapped by another goroutine. That
   970  // can happen when worker.cleanup is called in the worker goroutine while a
   971  // workerClient.fuzz call runs concurrently.
   972  //
   973  // This error should not be reported. It indicates the operation was
   974  // interrupted.
   975  var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
   976  
   977  // minimize tells the worker to call the minimize method. See
   978  // workerServer.minimize.
   979  func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) {
   980  	wc.mu.Lock()
   981  	defer wc.mu.Unlock()
   982  
   983  	mem, ok := <-wc.memMu
   984  	if !ok {
   985  		return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
   986  	}
   987  	mem.header().count = 0
   988  	inp, err := corpusEntryData(entryIn)
   989  	if err != nil {
   990  		return CorpusEntry{}, minimizeResponse{}, err
   991  	}
   992  	mem.setValue(inp)
   993  	defer func() { wc.memMu <- mem }()
   994  	entryOut = entryIn
   995  	entryOut.Values, err = unmarshalCorpusFile(inp)
   996  	if err != nil {
   997  		return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err)
   998  	}
   999  	for i, v := range entryOut.Values {
  1000  		if !isMinimizable(reflect.TypeOf(v)) {
  1001  			continue
  1002  		}
  1003  
  1004  		wc.memMu <- mem
  1005  		args.Index = i
  1006  		c := call{Minimize: &args}
  1007  		callErr := wc.callLocked(ctx, c, &resp)
  1008  		mem, ok = <-wc.memMu
  1009  		if !ok {
  1010  			return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
  1011  		}
  1012  
  1013  		if callErr != nil {
  1014  			retErr = callErr
  1015  			if !mem.header().rawInMem {
  1016  				// An unrecoverable error occurred before minimization began.
  1017  				return entryIn, minimizeResponse{}, retErr
  1018  			}
  1019  			// An unrecoverable error occurred during minimization. mem now
  1020  			// holds the raw, unmarshalled bytes of entryIn.Values[i] that
  1021  			// caused the error.
  1022  			switch entryOut.Values[i].(type) {
  1023  			case string:
  1024  				entryOut.Values[i] = string(mem.valueCopy())
  1025  			case []byte:
  1026  				entryOut.Values[i] = mem.valueCopy()
  1027  			default:
  1028  				panic("impossible")
  1029  			}
  1030  			entryOut.Data = marshalCorpusFile(entryOut.Values...)
  1031  			// Stop minimizing; another unrecoverable error is likely to occur.
  1032  			break
  1033  		}
  1034  
  1035  		if resp.WroteToMem {
  1036  			// Minimization succeeded, and mem holds the marshaled data.
  1037  			entryOut.Data = mem.valueCopy()
  1038  			entryOut.Values, err = unmarshalCorpusFile(entryOut.Data)
  1039  			if err != nil {
  1040  				return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err)
  1041  			}
  1042  		}
  1043  
  1044  		// Prepare for next iteration of the loop.
  1045  		if args.Timeout != 0 {
  1046  			args.Timeout -= resp.Duration
  1047  			if args.Timeout <= 0 {
  1048  				break
  1049  			}
  1050  		}
  1051  		if args.Limit != 0 {
  1052  			args.Limit -= mem.header().count
  1053  			if args.Limit <= 0 {
  1054  				break
  1055  			}
  1056  		}
  1057  	}
  1058  	resp.Count = mem.header().count
  1059  	h := sha256.Sum256(entryOut.Data)
  1060  	entryOut.Path = fmt.Sprintf("%x", h[:4])
  1061  	return entryOut, resp, retErr
  1062  }
  1063  
  1064  // fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
  1065  func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) {
  1066  	wc.mu.Lock()
  1067  	defer wc.mu.Unlock()
  1068  
  1069  	mem, ok := <-wc.memMu
  1070  	if !ok {
  1071  		return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
  1072  	}
  1073  	mem.header().count = 0
  1074  	inp, err := corpusEntryData(entryIn)
  1075  	if err != nil {
  1076  		return CorpusEntry{}, fuzzResponse{}, true, err
  1077  	}
  1078  	mem.setValue(inp)
  1079  	wc.memMu <- mem
  1080  
  1081  	c := call{Fuzz: &args}
  1082  	callErr := wc.callLocked(ctx, c, &resp)
  1083  	if resp.InternalErr != "" {
  1084  		return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr)
  1085  	}
  1086  	mem, ok = <-wc.memMu
  1087  	if !ok {
  1088  		return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
  1089  	}
  1090  	defer func() { wc.memMu <- mem }()
  1091  	resp.Count = mem.header().count
  1092  
  1093  	if !bytes.Equal(inp, mem.valueRef()) {
  1094  		return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input")
  1095  	}
  1096  	needEntryOut := callErr != nil || resp.Err != "" ||
  1097  		(!args.Warmup && resp.CoverageData != nil)
  1098  	if needEntryOut {
  1099  		valuesOut, err := unmarshalCorpusFile(inp)
  1100  		if err != nil {
  1101  			return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err)
  1102  		}
  1103  		wc.m.r.restore(mem.header().randState, mem.header().randInc)
  1104  		if !args.Warmup {
  1105  			// Only mutate the valuesOut if fuzzing actually occurred.
  1106  			numMutations := ((resp.Count - 1) % chainedMutations) + 1
  1107  			for i := int64(0); i < numMutations; i++ {
  1108  				wc.m.mutate(valuesOut, cap(mem.valueRef()))
  1109  			}
  1110  		}
  1111  		dataOut := marshalCorpusFile(valuesOut...)
  1112  
  1113  		h := sha256.Sum256(dataOut)
  1114  		name := fmt.Sprintf("%x", h[:4])
  1115  		entryOut = CorpusEntry{
  1116  			Parent:     entryIn.Path,
  1117  			Path:       name,
  1118  			Data:       dataOut,
  1119  			Generation: entryIn.Generation + 1,
  1120  		}
  1121  		if args.Warmup {
  1122  			// The bytes weren't mutated, so if entryIn was a seed corpus value,
  1123  			// then entryOut is too.
  1124  			entryOut.IsSeed = entryIn.IsSeed
  1125  		}
  1126  	}
  1127  
  1128  	return entryOut, resp, false, callErr
  1129  }
  1130  
  1131  // ping tells the worker to call the ping method. See workerServer.ping.
  1132  func (wc *workerClient) ping(ctx context.Context) error {
  1133  	wc.mu.Lock()
  1134  	defer wc.mu.Unlock()
  1135  	c := call{Ping: &pingArgs{}}
  1136  	var resp pingResponse
  1137  	return wc.callLocked(ctx, c, &resp)
  1138  }
  1139  
  1140  // callLocked sends an RPC from the coordinator to the worker process and waits
  1141  // for the response. The callLocked may be cancelled with ctx.
  1142  func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) {
  1143  	enc := json.NewEncoder(wc.fuzzIn)
  1144  	dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
  1145  	if err := enc.Encode(c); err != nil {
  1146  		return err
  1147  	}
  1148  	return dec.Decode(resp)
  1149  }
  1150  
  1151  // contextReader wraps a Reader with a Context. If the context is cancelled
  1152  // while the underlying reader is blocked, Read returns immediately.
  1153  //
  1154  // This is useful for reading from a pipe. Closing a pipe file descriptor does
  1155  // not unblock pending Reads on that file descriptor. All copies of the pipe's
  1156  // other file descriptor (the write end) must be closed in all processes that
  1157  // inherit it. This is difficult to do correctly in the situation we care about
  1158  // (process group termination).
  1159  type contextReader struct {
  1160  	ctx context.Context
  1161  	r   io.Reader
  1162  }
  1163  
  1164  func (cr *contextReader) Read(b []byte) (int, error) {
  1165  	if ctxErr := cr.ctx.Err(); ctxErr != nil {
  1166  		return 0, ctxErr
  1167  	}
  1168  	done := make(chan struct{})
  1169  
  1170  	// This goroutine may stay blocked after Read returns because the underlying
  1171  	// read is blocked.
  1172  	var n int
  1173  	var err error
  1174  	go func() {
  1175  		n, err = cr.r.Read(b)
  1176  		close(done)
  1177  	}()
  1178  
  1179  	select {
  1180  	case <-cr.ctx.Done():
  1181  		return 0, cr.ctx.Err()
  1182  	case <-done:
  1183  		return n, err
  1184  	}
  1185  }
  1186  

View as plain text