1//go:build solaris || darwin || freebsd || netbsd || openbsd || dragonfly
2// +build solaris darwin freebsd netbsd openbsd dragonfly
3
4package cancelreader
5
6import (
7 "errors"
8 "fmt"
9 "io"
10 "os"
11 "strings"
12
13 "golang.org/x/sys/unix"
14)
15
16// newSelectCancelReader returns a reader and a cancel function. If the input
17// reader is a File, the cancel function can be used to interrupt a
18// blocking call read call. In this case, the cancel function returns true if
19// the call was canceled successfully. If the input reader is not a File or
20// the file descriptor is 1024 or larger, the cancel function does nothing and
21// always returns false. The generic unix implementation is based on the posix
22// select syscall.
23func newSelectCancelReader(reader io.Reader) (CancelReader, error) {
24 file, ok := reader.(File)
25 if !ok || file.Fd() >= unix.FD_SETSIZE {
26 return newFallbackCancelReader(reader)
27 }
28 r := &selectCancelReader{file: file}
29
30 var err error
31
32 r.cancelSignalReader, r.cancelSignalWriter, err = os.Pipe()
33 if err != nil {
34 return nil, err
35 }
36
37 return r, nil
38}
39
40type selectCancelReader struct {
41 file File
42 cancelSignalReader File
43 cancelSignalWriter File
44 cancelMixin
45}
46
47func (r *selectCancelReader) Read(data []byte) (int, error) {
48 if r.isCanceled() {
49 return 0, ErrCanceled
50 }
51
52 for {
53 err := waitForRead(r.file, r.cancelSignalReader)
54 if err != nil {
55 if errors.Is(err, unix.EINTR) {
56 continue // try again if the syscall was interrupted
57 }
58
59 if errors.Is(err, ErrCanceled) {
60 // remove signal from pipe
61 var b [1]byte
62 _, readErr := r.cancelSignalReader.Read(b[:])
63 if readErr != nil {
64 return 0, fmt.Errorf("reading cancel signal: %w", readErr)
65 }
66 }
67
68 return 0, err
69 }
70
71 return r.file.Read(data)
72 }
73}
74
75func (r *selectCancelReader) Cancel() bool {
76 r.setCanceled()
77
78 // send cancel signal
79 _, err := r.cancelSignalWriter.Write([]byte{'c'})
80 return err == nil
81}
82
83func (r *selectCancelReader) Close() error {
84 var errMsgs []string
85
86 // close pipe
87 err := r.cancelSignalWriter.Close()
88 if err != nil {
89 errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal writer: %v", err))
90 }
91
92 err = r.cancelSignalReader.Close()
93 if err != nil {
94 errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal reader: %v", err))
95 }
96
97 if len(errMsgs) > 0 {
98 return fmt.Errorf(strings.Join(errMsgs, ", "))
99 }
100
101 return nil
102}
103
104func waitForRead(reader, abort File) error {
105 readerFd := int(reader.Fd())
106 abortFd := int(abort.Fd())
107
108 maxFd := readerFd
109 if abortFd > maxFd {
110 maxFd = abortFd
111 }
112
113 // this is a limitation of the select syscall
114 if maxFd >= unix.FD_SETSIZE {
115 return fmt.Errorf("cannot select on file descriptor %d which is larger than 1024", maxFd)
116 }
117
118 fdSet := &unix.FdSet{}
119 fdSet.Set(int(reader.Fd()))
120 fdSet.Set(int(abort.Fd()))
121
122 _, err := unix.Select(maxFd+1, fdSet, nil, nil, nil)
123 if err != nil {
124 return fmt.Errorf("select: %w", err)
125 }
126
127 if fdSet.IsSet(abortFd) {
128 return ErrCanceled
129 }
130
131 if fdSet.IsSet(readerFd) {
132 return nil
133 }
134
135 return fmt.Errorf("select returned without setting a file descriptor")
136}