1
2
3
4
5 package fuzz
6
7 import (
8 "bytes"
9 "context"
10 "crypto/sha256"
11 "encoding/json"
12 "errors"
13 "fmt"
14 "io"
15 "io/ioutil"
16 "os"
17 "os/exec"
18 "reflect"
19 "runtime"
20 "sync"
21 "time"
22 )
23
24 const (
25
26
27 workerFuzzDuration = 100 * time.Millisecond
28
29
30
31 workerTimeoutDuration = 1 * time.Second
32
33
34
35
36 workerExitCode = 70
37
38
39
40 workerSharedMemSize = 100 << 20
41 )
42
43
44
45
46
47 type worker struct {
48 dir string
49 binPath string
50 args []string
51 env []string
52
53 coordinator *coordinator
54
55 memMu chan *sharedMem
56
57 cmd *exec.Cmd
58 client *workerClient
59 waitErr error
60 interrupted bool
61 termC chan struct{}
62 }
63
64 func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) {
65 mem, err := sharedMemTempFile(workerSharedMemSize)
66 if err != nil {
67 return nil, err
68 }
69 memMu := make(chan *sharedMem, 1)
70 memMu <- mem
71 return &worker{
72 dir: dir,
73 binPath: binPath,
74 args: args,
75 env: env[:len(env):len(env)],
76 coordinator: c,
77 memMu: memMu,
78 }, nil
79 }
80
81
82 func (w *worker) cleanup() error {
83 mem := <-w.memMu
84 if mem == nil {
85 return nil
86 }
87 close(w.memMu)
88 return mem.Close()
89 }
90
91
92
93
94
95
96
97
98
99
100
101 func (w *worker) coordinate(ctx context.Context) error {
102
103 for {
104
105 if !w.isRunning() {
106 if err := w.startAndPing(ctx); err != nil {
107 return err
108 }
109 }
110
111 select {
112 case <-ctx.Done():
113
114 err := w.stop()
115 if err != nil && !w.interrupted && !isInterruptError(err) {
116 return err
117 }
118 return ctx.Err()
119
120 case <-w.termC:
121
122 err := w.stop()
123 if w.interrupted {
124 panic("worker interrupted after unexpected termination")
125 }
126 if err == nil || isInterruptError(err) {
127
128
129
130
131
132
133
134
135
136
137 return nil
138 }
139 if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
140
141
142 return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
143 }
144
145
146 return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err)
147
148
149 case input := <-w.coordinator.inputC:
150
151 args := fuzzArgs{
152 Limit: input.limit,
153 Timeout: input.timeout,
154 Warmup: input.warmup,
155 CoverageData: input.coverageData,
156 }
157 entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args)
158 canMinimize := true
159 if err != nil {
160
161 w.stop()
162 if ctx.Err() != nil {
163
164 return ctx.Err()
165 }
166 if w.interrupted {
167
168
169 return fmt.Errorf("communicating with fuzzing process: %v", err)
170 }
171 if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) {
172
173
174
175
176
177 return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr)
178 }
179 if isInternalError {
180
181
182 return err
183 }
184
185
186
187 resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr)
188 canMinimize = false
189 }
190 result := fuzzResult{
191 limit: input.limit,
192 count: resp.Count,
193 totalDuration: resp.TotalDuration,
194 entryDuration: resp.InterestingDuration,
195 entry: entry,
196 crasherMsg: resp.Err,
197 coverageData: resp.CoverageData,
198 canMinimize: canMinimize,
199 }
200 w.coordinator.resultC <- result
201
202 case input := <-w.coordinator.minimizeC:
203
204 result, err := w.minimize(ctx, input)
205 if err != nil {
206
207
208
209
210 result = fuzzResult{
211 entry: input.entry,
212 crasherMsg: input.crasherMsg,
213 canMinimize: false,
214 limit: input.limit,
215 }
216 if result.crasherMsg == "" {
217 result.crasherMsg = err.Error()
218 }
219 }
220 w.coordinator.resultC <- result
221 }
222 }
223 }
224
225
226
227
228
229 func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) {
230 if w.coordinator.opts.MinimizeTimeout != 0 {
231 var cancel func()
232 ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout)
233 defer cancel()
234 }
235
236 args := minimizeArgs{
237 Limit: input.limit,
238 Timeout: input.timeout,
239 KeepCoverage: input.keepCoverage,
240 }
241 entry, resp, err := w.client.minimize(ctx, input.entry, args)
242 if err != nil {
243
244 w.stop()
245 if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) {
246
247
248
249
250
251 return fuzzResult{
252 entry: input.entry,
253 crasherMsg: input.crasherMsg,
254 coverageData: input.keepCoverage,
255 canMinimize: false,
256 limit: input.limit,
257 }, nil
258 }
259 return fuzzResult{
260 entry: entry,
261 crasherMsg: fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err),
262 canMinimize: false,
263 limit: input.limit,
264 count: resp.Count,
265 totalDuration: resp.Duration,
266 }, nil
267 }
268
269 if input.crasherMsg != "" && resp.Err == "" {
270 return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce")
271 }
272
273 return fuzzResult{
274 entry: entry,
275 crasherMsg: resp.Err,
276 coverageData: resp.CoverageData,
277 canMinimize: false,
278 limit: input.limit,
279 count: resp.Count,
280 totalDuration: resp.Duration,
281 }, nil
282 }
283
284 func (w *worker) isRunning() bool {
285 return w.cmd != nil
286 }
287
288
289
290
291
292
293
294
295
296 func (w *worker) startAndPing(ctx context.Context) error {
297 if ctx.Err() != nil {
298 return ctx.Err()
299 }
300 if err := w.start(); err != nil {
301 return err
302 }
303 if err := w.client.ping(ctx); err != nil {
304 w.stop()
305 if ctx.Err() != nil {
306 return ctx.Err()
307 }
308 if isInterruptError(err) {
309
310 return err
311 }
312
313 return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
314 }
315 return nil
316 }
317
318
319
320
321
322
323
324
325
326
327
328 func (w *worker) start() (err error) {
329 if w.isRunning() {
330 panic("worker already started")
331 }
332 w.waitErr = nil
333 w.interrupted = false
334 w.termC = nil
335
336 cmd := exec.Command(w.binPath, w.args...)
337 cmd.Dir = w.dir
338 cmd.Env = w.env[:len(w.env):len(w.env)]
339
340
341
342
343
344
345
346
347
348 fuzzInR, fuzzInW, err := os.Pipe()
349 if err != nil {
350 return err
351 }
352 defer fuzzInR.Close()
353 fuzzOutR, fuzzOutW, err := os.Pipe()
354 if err != nil {
355 fuzzInW.Close()
356 return err
357 }
358 defer fuzzOutW.Close()
359 setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu})
360
361
362 if err := cmd.Start(); err != nil {
363 fuzzInW.Close()
364 fuzzOutR.Close()
365 return err
366 }
367
368
369
370
371 w.cmd = cmd
372 w.termC = make(chan struct{})
373 comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu}
374 m := newMutator()
375 w.client = newWorkerClient(comm, m)
376
377 go func() {
378 w.waitErr = w.cmd.Wait()
379 close(w.termC)
380 }()
381
382 return nil
383 }
384
385
386
387
388
389
390
391
392
393
394 func (w *worker) stop() error {
395 if w.termC == nil {
396 panic("worker was not started successfully")
397 }
398 select {
399 case <-w.termC:
400
401 if w.client == nil {
402
403 return w.waitErr
404 }
405
406 w.client.Close()
407 w.cmd = nil
408 w.client = nil
409 return w.waitErr
410 default:
411
412 }
413
414
415
416 closeC := make(chan struct{})
417 go func() {
418 w.client.Close()
419 close(closeC)
420 }()
421
422 sig := os.Interrupt
423 if runtime.GOOS == "windows" {
424
425
426
427 sig = os.Kill
428 }
429
430 t := time.NewTimer(workerTimeoutDuration)
431 for {
432 select {
433 case <-w.termC:
434
435 t.Stop()
436 <-closeC
437 w.cmd = nil
438 w.client = nil
439 return w.waitErr
440
441 case <-t.C:
442
443 w.interrupted = true
444 switch sig {
445 case os.Interrupt:
446
447 w.cmd.Process.Signal(sig)
448 sig = os.Kill
449 t.Reset(workerTimeoutDuration)
450
451 case os.Kill:
452
453 w.cmd.Process.Signal(sig)
454 sig = nil
455 t.Reset(workerTimeoutDuration)
456
457 case nil:
458
459 fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n")
460 }
461 }
462 }
463 }
464
465
466
467
468
469
470
471
472
473
474
475 func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error {
476 comm, err := getWorkerComm()
477 if err != nil {
478 return err
479 }
480 srv := &workerServer{
481 workerComm: comm,
482 fuzzFn: func(e CorpusEntry) (time.Duration, error) {
483 timer := time.AfterFunc(10*time.Second, func() {
484 panic("deadlocked!")
485 })
486 defer timer.Stop()
487 start := time.Now()
488 err := fn(e)
489 return time.Since(start), err
490 },
491 m: newMutator(),
492 }
493 return srv.serve(ctx)
494 }
495
496
497
498
499 type call struct {
500 Ping *pingArgs
501 Fuzz *fuzzArgs
502 Minimize *minimizeArgs
503 }
504
505
506
507 type minimizeArgs struct {
508
509
510
511 Timeout time.Duration
512
513
514
515 Limit int64
516
517
518
519
520 KeepCoverage []byte
521
522
523 Index int
524 }
525
526
527 type minimizeResponse struct {
528
529
530
531
532 WroteToMem bool
533
534
535 Err string
536
537
538
539
540 CoverageData []byte
541
542
543 Duration time.Duration
544
545
546 Count int64
547 }
548
549
550
551 type fuzzArgs struct {
552
553
554 Timeout time.Duration
555
556
557
558 Limit int64
559
560
561
562
563 Warmup bool
564
565
566
567 CoverageData []byte
568 }
569
570
571 type fuzzResponse struct {
572
573 TotalDuration time.Duration
574 InterestingDuration time.Duration
575
576
577 Count int64
578
579
580
581 CoverageData []byte
582
583
584
585 Err string
586
587
588
589 InternalErr string
590 }
591
592
593 type pingArgs struct{}
594
595
596 type pingResponse struct{}
597
598
599
600
601
602
603
604
605
606
607 type workerComm struct {
608 fuzzIn, fuzzOut *os.File
609 memMu chan *sharedMem
610 }
611
612
613
614
615
616
617 type workerServer struct {
618 workerComm
619 m *mutator
620
621
622
623
624 coverageMask []byte
625
626
627
628
629
630
631 fuzzFn func(CorpusEntry) (time.Duration, error)
632 }
633
634
635
636
637
638
639
640
641
642
643
644 func (ws *workerServer) serve(ctx context.Context) error {
645 enc := json.NewEncoder(ws.fuzzOut)
646 dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
647 for {
648 var c call
649 if err := dec.Decode(&c); err != nil {
650 if err == io.EOF || err == ctx.Err() {
651 return nil
652 } else {
653 return err
654 }
655 }
656
657 var resp any
658 switch {
659 case c.Fuzz != nil:
660 resp = ws.fuzz(ctx, *c.Fuzz)
661 case c.Minimize != nil:
662 resp = ws.minimize(ctx, *c.Minimize)
663 case c.Ping != nil:
664 resp = ws.ping(ctx, *c.Ping)
665 default:
666 return errors.New("no arguments provided for any call")
667 }
668
669 if err := enc.Encode(resp); err != nil {
670 return err
671 }
672 }
673 }
674
675
676
677
678
679
680
681
682
683
684 const chainedMutations = 5
685
686
687
688
689
690
691
692
693
694
695
696
697 func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) {
698 if args.CoverageData != nil {
699 if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) {
700 resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask))
701 return resp
702 }
703 ws.coverageMask = args.CoverageData
704 }
705 start := time.Now()
706 defer func() { resp.TotalDuration = time.Since(start) }()
707
708 if args.Timeout != 0 {
709 var cancel func()
710 ctx, cancel = context.WithTimeout(ctx, args.Timeout)
711 defer cancel()
712 }
713 mem := <-ws.memMu
714 ws.m.r.save(&mem.header().randState, &mem.header().randInc)
715 defer func() {
716 resp.Count = mem.header().count
717 ws.memMu <- mem
718 }()
719 if args.Limit > 0 && mem.header().count >= args.Limit {
720 resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit)
721 return resp
722 }
723
724 originalVals, err := unmarshalCorpusFile(mem.valueCopy())
725 if err != nil {
726 resp.InternalErr = err.Error()
727 return resp
728 }
729 vals := make([]any, len(originalVals))
730 copy(vals, originalVals)
731
732 shouldStop := func() bool {
733 return args.Limit > 0 && mem.header().count >= args.Limit
734 }
735 fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) {
736 mem.header().count++
737 var err error
738 dur, err = ws.fuzzFn(entry)
739 if err != nil {
740 errMsg = err.Error()
741 if errMsg == "" {
742 errMsg = "fuzz function failed with no input"
743 }
744 return dur, nil, errMsg
745 }
746 if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 {
747 return dur, coverageSnapshot, ""
748 }
749 return dur, nil, ""
750 }
751
752 if args.Warmup {
753 dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals})
754 if errMsg != "" {
755 resp.Err = errMsg
756 return resp
757 }
758 resp.InterestingDuration = dur
759 if coverageEnabled {
760 resp.CoverageData = coverageSnapshot
761 }
762 return resp
763 }
764
765 for {
766 select {
767 case <-ctx.Done():
768 return resp
769 default:
770 if mem.header().count%chainedMutations == 0 {
771 copy(vals, originalVals)
772 ws.m.r.save(&mem.header().randState, &mem.header().randInc)
773 }
774 ws.m.mutate(vals, cap(mem.valueRef()))
775
776 entry := CorpusEntry{Values: vals}
777 dur, cov, errMsg := fuzzOnce(entry)
778 if errMsg != "" {
779 resp.Err = errMsg
780 return resp
781 }
782 if cov != nil {
783 resp.CoverageData = cov
784 resp.InterestingDuration = dur
785 return resp
786 }
787 if shouldStop() {
788 return resp
789 }
790 }
791 }
792 }
793
794 func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) {
795 start := time.Now()
796 defer func() { resp.Duration = time.Now().Sub(start) }()
797 mem := <-ws.memMu
798 defer func() { ws.memMu <- mem }()
799 vals, err := unmarshalCorpusFile(mem.valueCopy())
800 if err != nil {
801 panic(err)
802 }
803 inpHash := sha256.Sum256(mem.valueCopy())
804 if args.Timeout != 0 {
805 var cancel func()
806 ctx, cancel = context.WithTimeout(ctx, args.Timeout)
807 defer cancel()
808 }
809
810
811
812 success, err := ws.minimizeInput(ctx, vals, mem, args)
813 if success {
814 writeToMem(vals, mem)
815 outHash := sha256.Sum256(mem.valueCopy())
816 mem.header().rawInMem = false
817 resp.WroteToMem = true
818 if err != nil {
819 resp.Err = err.Error()
820 } else {
821
822
823
824
825
826 if outHash != inpHash {
827 resp.CoverageData = coverageSnapshot
828 } else {
829 resp.CoverageData = args.KeepCoverage
830 }
831 }
832 }
833 return resp
834 }
835
836
837
838
839
840
841 func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) {
842 keepCoverage := args.KeepCoverage
843 memBytes := mem.valueRef()
844 bPtr := &memBytes
845 count := &mem.header().count
846 shouldStop := func() bool {
847 return ctx.Err() != nil ||
848 (args.Limit > 0 && *count >= args.Limit)
849 }
850 if shouldStop() {
851 return false, nil
852 }
853
854
855
856
857 *count++
858 _, retErr = ws.fuzzFn(CorpusEntry{Values: vals})
859 if keepCoverage != nil {
860 if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil {
861 return false, nil
862 }
863 } else if retErr == nil {
864 return false, nil
865 }
866 mem.header().rawInMem = true
867
868
869
870
871
872 tryMinimized := func(candidate []byte) bool {
873 prev := vals[args.Index]
874 switch prev.(type) {
875 case []byte:
876 vals[args.Index] = candidate
877 case string:
878 vals[args.Index] = string(candidate)
879 default:
880 panic("impossible")
881 }
882 copy(*bPtr, candidate)
883 *bPtr = (*bPtr)[:len(candidate)]
884 mem.setValueLen(len(candidate))
885 *count++
886 _, err := ws.fuzzFn(CorpusEntry{Values: vals})
887 if err != nil {
888 retErr = err
889 if keepCoverage != nil {
890
891
892
893 keepCoverage = nil
894 }
895 return true
896 }
897
898 if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) {
899 return true
900 }
901 vals[args.Index] = prev
902 return false
903 }
904 switch v := vals[args.Index].(type) {
905 case string:
906 minimizeBytes([]byte(v), tryMinimized, shouldStop)
907 case []byte:
908 minimizeBytes(v, tryMinimized, shouldStop)
909 default:
910 panic("impossible")
911 }
912 return true, retErr
913 }
914
915 func writeToMem(vals []any, mem *sharedMem) {
916 b := marshalCorpusFile(vals...)
917 mem.setValue(b)
918 }
919
920
921
922 func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse {
923 return pingResponse{}
924 }
925
926
927
928
929 type workerClient struct {
930 workerComm
931 m *mutator
932
933
934
935
936
937
938 mu sync.Mutex
939 }
940
941 func newWorkerClient(comm workerComm, m *mutator) *workerClient {
942 return &workerClient{workerComm: comm, m: m}
943 }
944
945
946
947
948 func (wc *workerClient) Close() error {
949 wc.mu.Lock()
950 defer wc.mu.Unlock()
951
952
953
954 if err := wc.fuzzIn.Close(); err != nil {
955 wc.fuzzOut.Close()
956 return err
957 }
958
959
960
961 if _, err := io.Copy(ioutil.Discard, wc.fuzzOut); err != nil {
962 wc.fuzzOut.Close()
963 return err
964 }
965 return wc.fuzzOut.Close()
966 }
967
968
969
970
971
972
973
974
975 var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
976
977
978
979 func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) {
980 wc.mu.Lock()
981 defer wc.mu.Unlock()
982
983 mem, ok := <-wc.memMu
984 if !ok {
985 return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
986 }
987 mem.header().count = 0
988 inp, err := corpusEntryData(entryIn)
989 if err != nil {
990 return CorpusEntry{}, minimizeResponse{}, err
991 }
992 mem.setValue(inp)
993 defer func() { wc.memMu <- mem }()
994 entryOut = entryIn
995 entryOut.Values, err = unmarshalCorpusFile(inp)
996 if err != nil {
997 return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err)
998 }
999 for i, v := range entryOut.Values {
1000 if !isMinimizable(reflect.TypeOf(v)) {
1001 continue
1002 }
1003
1004 wc.memMu <- mem
1005 args.Index = i
1006 c := call{Minimize: &args}
1007 callErr := wc.callLocked(ctx, c, &resp)
1008 mem, ok = <-wc.memMu
1009 if !ok {
1010 return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
1011 }
1012
1013 if callErr != nil {
1014 retErr = callErr
1015 if !mem.header().rawInMem {
1016
1017 return entryIn, minimizeResponse{}, retErr
1018 }
1019
1020
1021
1022 switch entryOut.Values[i].(type) {
1023 case string:
1024 entryOut.Values[i] = string(mem.valueCopy())
1025 case []byte:
1026 entryOut.Values[i] = mem.valueCopy()
1027 default:
1028 panic("impossible")
1029 }
1030 entryOut.Data = marshalCorpusFile(entryOut.Values...)
1031
1032 break
1033 }
1034
1035 if resp.WroteToMem {
1036
1037 entryOut.Data = mem.valueCopy()
1038 entryOut.Values, err = unmarshalCorpusFile(entryOut.Data)
1039 if err != nil {
1040 return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err)
1041 }
1042 }
1043
1044
1045 if args.Timeout != 0 {
1046 args.Timeout -= resp.Duration
1047 if args.Timeout <= 0 {
1048 break
1049 }
1050 }
1051 if args.Limit != 0 {
1052 args.Limit -= mem.header().count
1053 if args.Limit <= 0 {
1054 break
1055 }
1056 }
1057 }
1058 resp.Count = mem.header().count
1059 h := sha256.Sum256(entryOut.Data)
1060 entryOut.Path = fmt.Sprintf("%x", h[:4])
1061 return entryOut, resp, retErr
1062 }
1063
1064
1065 func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) {
1066 wc.mu.Lock()
1067 defer wc.mu.Unlock()
1068
1069 mem, ok := <-wc.memMu
1070 if !ok {
1071 return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1072 }
1073 mem.header().count = 0
1074 inp, err := corpusEntryData(entryIn)
1075 if err != nil {
1076 return CorpusEntry{}, fuzzResponse{}, true, err
1077 }
1078 mem.setValue(inp)
1079 wc.memMu <- mem
1080
1081 c := call{Fuzz: &args}
1082 callErr := wc.callLocked(ctx, c, &resp)
1083 if resp.InternalErr != "" {
1084 return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr)
1085 }
1086 mem, ok = <-wc.memMu
1087 if !ok {
1088 return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1089 }
1090 defer func() { wc.memMu <- mem }()
1091 resp.Count = mem.header().count
1092
1093 if !bytes.Equal(inp, mem.valueRef()) {
1094 return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input")
1095 }
1096 needEntryOut := callErr != nil || resp.Err != "" ||
1097 (!args.Warmup && resp.CoverageData != nil)
1098 if needEntryOut {
1099 valuesOut, err := unmarshalCorpusFile(inp)
1100 if err != nil {
1101 return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err)
1102 }
1103 wc.m.r.restore(mem.header().randState, mem.header().randInc)
1104 if !args.Warmup {
1105
1106 numMutations := ((resp.Count - 1) % chainedMutations) + 1
1107 for i := int64(0); i < numMutations; i++ {
1108 wc.m.mutate(valuesOut, cap(mem.valueRef()))
1109 }
1110 }
1111 dataOut := marshalCorpusFile(valuesOut...)
1112
1113 h := sha256.Sum256(dataOut)
1114 name := fmt.Sprintf("%x", h[:4])
1115 entryOut = CorpusEntry{
1116 Parent: entryIn.Path,
1117 Path: name,
1118 Data: dataOut,
1119 Generation: entryIn.Generation + 1,
1120 }
1121 if args.Warmup {
1122
1123
1124 entryOut.IsSeed = entryIn.IsSeed
1125 }
1126 }
1127
1128 return entryOut, resp, false, callErr
1129 }
1130
1131
1132 func (wc *workerClient) ping(ctx context.Context) error {
1133 wc.mu.Lock()
1134 defer wc.mu.Unlock()
1135 c := call{Ping: &pingArgs{}}
1136 var resp pingResponse
1137 return wc.callLocked(ctx, c, &resp)
1138 }
1139
1140
1141
1142 func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) {
1143 enc := json.NewEncoder(wc.fuzzIn)
1144 dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
1145 if err := enc.Encode(c); err != nil {
1146 return err
1147 }
1148 return dec.Decode(resp)
1149 }
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159 type contextReader struct {
1160 ctx context.Context
1161 r io.Reader
1162 }
1163
1164 func (cr *contextReader) Read(b []byte) (int, error) {
1165 if ctxErr := cr.ctx.Err(); ctxErr != nil {
1166 return 0, ctxErr
1167 }
1168 done := make(chan struct{})
1169
1170
1171
1172 var n int
1173 var err error
1174 go func() {
1175 n, err = cr.r.Read(b)
1176 close(done)
1177 }()
1178
1179 select {
1180 case <-cr.ctx.Done():
1181 return 0, cr.ctx.Err()
1182 case <-done:
1183 return n, err
1184 }
1185 }
1186
View as plain text