1//go:build darwin || freebsd || netbsd || openbsd || dragonfly
2// +build darwin freebsd netbsd openbsd dragonfly
3
4package cancelreader
5
6import (
7 "errors"
8 "fmt"
9 "io"
10 "os"
11 "strings"
12
13 "golang.org/x/sys/unix"
14)
15
16// NewReader returns a reader and a cancel function. If the input reader is a
17// File, the cancel function can be used to interrupt a blocking read call.
18// In this case, the cancel function returns true if the call was canceled
19// successfully. If the input reader is not a File, the cancel function
20// does nothing and always returns false. The BSD and macOS implementation is
21// based on the kqueue mechanism.
22func NewReader(reader io.Reader) (CancelReader, error) {
23 file, ok := reader.(File)
24 if !ok {
25 return newFallbackCancelReader(reader)
26 }
27
28 // kqueue returns instantly when polling /dev/tty so fallback to select
29 if file.Name() == "/dev/tty" {
30 return newSelectCancelReader(reader)
31 }
32
33 kQueue, err := unix.Kqueue()
34 if err != nil {
35 return nil, fmt.Errorf("create kqueue: %w", err)
36 }
37
38 r := &kqueueCancelReader{
39 file: file,
40 kQueue: kQueue,
41 }
42
43 r.cancelSignalReader, r.cancelSignalWriter, err = os.Pipe()
44 if err != nil {
45 _ = unix.Close(kQueue)
46 return nil, err
47 }
48
49 unix.SetKevent(&r.kQueueEvents[0], int(file.Fd()), unix.EVFILT_READ, unix.EV_ADD)
50 unix.SetKevent(&r.kQueueEvents[1], int(r.cancelSignalReader.Fd()), unix.EVFILT_READ, unix.EV_ADD)
51
52 return r, nil
53}
54
55type kqueueCancelReader struct {
56 file File
57 cancelSignalReader File
58 cancelSignalWriter File
59 cancelMixin
60 kQueue int
61 kQueueEvents [2]unix.Kevent_t
62}
63
64func (r *kqueueCancelReader) Read(data []byte) (int, error) {
65 if r.isCanceled() {
66 return 0, ErrCanceled
67 }
68
69 err := r.wait()
70 if err != nil {
71 if errors.Is(err, ErrCanceled) {
72 // remove signal from pipe
73 var b [1]byte
74 _, errRead := r.cancelSignalReader.Read(b[:])
75 if errRead != nil {
76 return 0, fmt.Errorf("reading cancel signal: %w", errRead)
77 }
78 }
79
80 return 0, err
81 }
82
83 return r.file.Read(data)
84}
85
86func (r *kqueueCancelReader) Cancel() bool {
87 r.setCanceled()
88
89 // send cancel signal
90 _, err := r.cancelSignalWriter.Write([]byte{'c'})
91 return err == nil
92}
93
94func (r *kqueueCancelReader) Close() error {
95 var errMsgs []string
96
97 // close kqueue
98 err := unix.Close(r.kQueue)
99 if err != nil {
100 errMsgs = append(errMsgs, fmt.Sprintf("closing kqueue: %v", err))
101 }
102
103 // close pipe
104 err = r.cancelSignalWriter.Close()
105 if err != nil {
106 errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal writer: %v", err))
107 }
108
109 err = r.cancelSignalReader.Close()
110 if err != nil {
111 errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal reader: %v", err))
112 }
113
114 if len(errMsgs) > 0 {
115 return fmt.Errorf(strings.Join(errMsgs, ", "))
116 }
117
118 return nil
119}
120
121func (r *kqueueCancelReader) wait() error {
122 events := make([]unix.Kevent_t, 1)
123
124 for {
125 _, err := unix.Kevent(r.kQueue, r.kQueueEvents[:], events, nil)
126 if errors.Is(err, unix.EINTR) {
127 continue // try again if the syscall was interrupted
128 }
129
130 if err != nil {
131 return fmt.Errorf("kevent: %w", err)
132 }
133
134 break
135 }
136
137 ident := uint64(events[0].Ident)
138 switch ident {
139 case uint64(r.file.Fd()):
140 return nil
141 case uint64(r.cancelSignalReader.Fd()):
142 return ErrCanceled
143 }
144
145 return fmt.Errorf("unknown error")
146}