1//go:build linux
2// +build linux
3
4package cancelreader
5
6import (
7 "errors"
8 "fmt"
9 "io"
10 "os"
11 "strings"
12
13 "golang.org/x/sys/unix"
14)
15
16// NewReader returns a reader and a cancel function. If the input reader is a
17// File, the cancel function can be used to interrupt a blocking read call.
18// In this case, the cancel function returns true if the call was canceled
19// successfully. If the input reader is not a File, the cancel function
20// does nothing and always returns false. The Linux implementation is based on
21// the epoll mechanism.
22func NewReader(reader io.Reader) (CancelReader, error) {
23 file, ok := reader.(File)
24 if !ok {
25 return newFallbackCancelReader(reader)
26 }
27
28 epoll, err := unix.EpollCreate1(0)
29 if err != nil {
30 return nil, fmt.Errorf("create epoll: %w", err)
31 }
32
33 r := &epollCancelReader{
34 file: file,
35 epoll: epoll,
36 }
37
38 r.cancelSignalReader, r.cancelSignalWriter, err = os.Pipe()
39 if err != nil {
40 _ = unix.Close(epoll)
41 return nil, err
42 }
43
44 err = unix.EpollCtl(epoll, unix.EPOLL_CTL_ADD, int(file.Fd()), &unix.EpollEvent{
45 Events: unix.EPOLLIN,
46 Fd: int32(file.Fd()),
47 })
48 if err != nil {
49 _ = unix.Close(epoll)
50 return nil, fmt.Errorf("add reader to epoll interest list")
51 }
52
53 err = unix.EpollCtl(epoll, unix.EPOLL_CTL_ADD, int(r.cancelSignalReader.Fd()), &unix.EpollEvent{
54 Events: unix.EPOLLIN,
55 Fd: int32(r.cancelSignalReader.Fd()),
56 })
57 if err != nil {
58 _ = unix.Close(epoll)
59 return nil, fmt.Errorf("add reader to epoll interest list")
60 }
61
62 return r, nil
63}
64
65type epollCancelReader struct {
66 file File
67 cancelSignalReader File
68 cancelSignalWriter File
69 cancelMixin
70 epoll int
71}
72
73func (r *epollCancelReader) Read(data []byte) (int, error) {
74 if r.isCanceled() {
75 return 0, ErrCanceled
76 }
77
78 err := r.wait()
79 if err != nil {
80 if errors.Is(err, ErrCanceled) {
81 // remove signal from pipe
82 var b [1]byte
83 _, readErr := r.cancelSignalReader.Read(b[:])
84 if readErr != nil {
85 return 0, fmt.Errorf("reading cancel signal: %w", readErr)
86 }
87 }
88
89 return 0, err
90 }
91
92 return r.file.Read(data)
93}
94
95func (r *epollCancelReader) Cancel() bool {
96 r.setCanceled()
97
98 // send cancel signal
99 _, err := r.cancelSignalWriter.Write([]byte{'c'})
100 return err == nil
101}
102
103func (r *epollCancelReader) Close() error {
104 var errMsgs []string
105
106 // close kqueue
107 err := unix.Close(r.epoll)
108 if err != nil {
109 errMsgs = append(errMsgs, fmt.Sprintf("closing epoll: %v", err))
110 }
111
112 // close pipe
113 err = r.cancelSignalWriter.Close()
114 if err != nil {
115 errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal writer: %v", err))
116 }
117
118 err = r.cancelSignalReader.Close()
119 if err != nil {
120 errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal reader: %v", err))
121 }
122
123 if len(errMsgs) > 0 {
124 return fmt.Errorf(strings.Join(errMsgs, ", "))
125 }
126
127 return nil
128}
129
130func (r *epollCancelReader) wait() error {
131 events := make([]unix.EpollEvent, 1)
132
133 for {
134 _, err := unix.EpollWait(r.epoll, events, -1)
135 if errors.Is(err, unix.EINTR) {
136 continue // try again if the syscall was interrupted
137 }
138
139 if err != nil {
140 return fmt.Errorf("kevent: %w", err)
141 }
142
143 break
144 }
145
146 switch events[0].Fd {
147 case int32(r.file.Fd()):
148 return nil
149 case int32(r.cancelSignalReader.Fd()):
150 return ErrCanceled
151 }
152
153 return fmt.Errorf("unknown error")
154}