cancelreader_linux.go

  1//go:build linux
  2// +build linux
  3
  4package cancelreader
  5
  6import (
  7	"errors"
  8	"fmt"
  9	"io"
 10	"os"
 11	"strings"
 12
 13	"golang.org/x/sys/unix"
 14)
 15
 16// NewReader returns a reader and a cancel function. If the input reader is a
 17// File, the cancel function can be used to interrupt a blocking read call.
 18// In this case, the cancel function returns true if the call was canceled
 19// successfully. If the input reader is not a File, the cancel function
 20// does nothing and always returns false. The Linux implementation is based on
 21// the epoll mechanism.
 22func NewReader(reader io.Reader) (CancelReader, error) {
 23	file, ok := reader.(File)
 24	if !ok {
 25		return newFallbackCancelReader(reader)
 26	}
 27
 28	epoll, err := unix.EpollCreate1(0)
 29	if err != nil {
 30		return nil, fmt.Errorf("create epoll: %w", err)
 31	}
 32
 33	r := &epollCancelReader{
 34		file:  file,
 35		epoll: epoll,
 36	}
 37
 38	r.cancelSignalReader, r.cancelSignalWriter, err = os.Pipe()
 39	if err != nil {
 40		_ = unix.Close(epoll)
 41		return nil, err
 42	}
 43
 44	err = unix.EpollCtl(epoll, unix.EPOLL_CTL_ADD, int(file.Fd()), &unix.EpollEvent{
 45		Events: unix.EPOLLIN,
 46		Fd:     int32(file.Fd()),
 47	})
 48	if err != nil {
 49		_ = unix.Close(epoll)
 50		return nil, fmt.Errorf("add reader to epoll interest list")
 51	}
 52
 53	err = unix.EpollCtl(epoll, unix.EPOLL_CTL_ADD, int(r.cancelSignalReader.Fd()), &unix.EpollEvent{
 54		Events: unix.EPOLLIN,
 55		Fd:     int32(r.cancelSignalReader.Fd()),
 56	})
 57	if err != nil {
 58		_ = unix.Close(epoll)
 59		return nil, fmt.Errorf("add reader to epoll interest list")
 60	}
 61
 62	return r, nil
 63}
 64
 65type epollCancelReader struct {
 66	file               File
 67	cancelSignalReader File
 68	cancelSignalWriter File
 69	cancelMixin
 70	epoll int
 71}
 72
 73func (r *epollCancelReader) Read(data []byte) (int, error) {
 74	if r.isCanceled() {
 75		return 0, ErrCanceled
 76	}
 77
 78	err := r.wait()
 79	if err != nil {
 80		if errors.Is(err, ErrCanceled) {
 81			// remove signal from pipe
 82			var b [1]byte
 83			_, readErr := r.cancelSignalReader.Read(b[:])
 84			if readErr != nil {
 85				return 0, fmt.Errorf("reading cancel signal: %w", readErr)
 86			}
 87		}
 88
 89		return 0, err
 90	}
 91
 92	return r.file.Read(data)
 93}
 94
 95func (r *epollCancelReader) Cancel() bool {
 96	r.setCanceled()
 97
 98	// send cancel signal
 99	_, err := r.cancelSignalWriter.Write([]byte{'c'})
100	return err == nil
101}
102
103func (r *epollCancelReader) Close() error {
104	var errMsgs []string
105
106	// close kqueue
107	err := unix.Close(r.epoll)
108	if err != nil {
109		errMsgs = append(errMsgs, fmt.Sprintf("closing epoll: %v", err))
110	}
111
112	// close pipe
113	err = r.cancelSignalWriter.Close()
114	if err != nil {
115		errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal writer: %v", err))
116	}
117
118	err = r.cancelSignalReader.Close()
119	if err != nil {
120		errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal reader: %v", err))
121	}
122
123	if len(errMsgs) > 0 {
124		return fmt.Errorf(strings.Join(errMsgs, ", "))
125	}
126
127	return nil
128}
129
130func (r *epollCancelReader) wait() error {
131	events := make([]unix.EpollEvent, 1)
132
133	for {
134		_, err := unix.EpollWait(r.epoll, events, -1)
135		if errors.Is(err, unix.EINTR) {
136			continue // try again if the syscall was interrupted
137		}
138
139		if err != nil {
140			return fmt.Errorf("kevent: %w", err)
141		}
142
143		break
144	}
145
146	switch events[0].Fd {
147	case int32(r.file.Fd()):
148		return nil
149	case int32(r.cancelSignalReader.Fd()):
150		return ErrCanceled
151	}
152
153	return fmt.Errorf("unknown error")
154}