cancelreader_select.go

  1//go:build solaris || darwin || freebsd || netbsd || openbsd || dragonfly
  2// +build solaris darwin freebsd netbsd openbsd dragonfly
  3
  4package cancelreader
  5
  6import (
  7	"errors"
  8	"fmt"
  9	"io"
 10	"os"
 11	"strings"
 12
 13	"golang.org/x/sys/unix"
 14)
 15
 16// newSelectCancelReader returns a reader and a cancel function. If the input
 17// reader is a File, the cancel function can be used to interrupt a
 18// blocking call read call. In this case, the cancel function returns true if
 19// the call was canceled successfully. If the input reader is not a File or
 20// the file descriptor is 1024 or larger, the cancel function does nothing and
 21// always returns false. The generic unix implementation is based on the posix
 22// select syscall.
 23func newSelectCancelReader(reader io.Reader) (CancelReader, error) {
 24	file, ok := reader.(File)
 25	if !ok || file.Fd() >= unix.FD_SETSIZE {
 26		return newFallbackCancelReader(reader)
 27	}
 28	r := &selectCancelReader{file: file}
 29
 30	var err error
 31
 32	r.cancelSignalReader, r.cancelSignalWriter, err = os.Pipe()
 33	if err != nil {
 34		return nil, err
 35	}
 36
 37	return r, nil
 38}
 39
 40type selectCancelReader struct {
 41	file               File
 42	cancelSignalReader File
 43	cancelSignalWriter File
 44	cancelMixin
 45}
 46
 47func (r *selectCancelReader) Read(data []byte) (int, error) {
 48	if r.isCanceled() {
 49		return 0, ErrCanceled
 50	}
 51
 52	for {
 53		err := waitForRead(r.file, r.cancelSignalReader)
 54		if err != nil {
 55			if errors.Is(err, unix.EINTR) {
 56				continue // try again if the syscall was interrupted
 57			}
 58
 59			if errors.Is(err, ErrCanceled) {
 60				// remove signal from pipe
 61				var b [1]byte
 62				_, readErr := r.cancelSignalReader.Read(b[:])
 63				if readErr != nil {
 64					return 0, fmt.Errorf("reading cancel signal: %w", readErr)
 65				}
 66			}
 67
 68			return 0, err
 69		}
 70
 71		return r.file.Read(data)
 72	}
 73}
 74
 75func (r *selectCancelReader) Cancel() bool {
 76	r.setCanceled()
 77
 78	// send cancel signal
 79	_, err := r.cancelSignalWriter.Write([]byte{'c'})
 80	return err == nil
 81}
 82
 83func (r *selectCancelReader) Close() error {
 84	var errMsgs []string
 85
 86	// close pipe
 87	err := r.cancelSignalWriter.Close()
 88	if err != nil {
 89		errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal writer: %v", err))
 90	}
 91
 92	err = r.cancelSignalReader.Close()
 93	if err != nil {
 94		errMsgs = append(errMsgs, fmt.Sprintf("closing cancel signal reader: %v", err))
 95	}
 96
 97	if len(errMsgs) > 0 {
 98		return fmt.Errorf(strings.Join(errMsgs, ", "))
 99	}
100
101	return nil
102}
103
104func waitForRead(reader, abort File) error {
105	readerFd := int(reader.Fd())
106	abortFd := int(abort.Fd())
107
108	maxFd := readerFd
109	if abortFd > maxFd {
110		maxFd = abortFd
111	}
112
113	// this is a limitation of the select syscall
114	if maxFd >= unix.FD_SETSIZE {
115		return fmt.Errorf("cannot select on file descriptor %d which is larger than 1024", maxFd)
116	}
117
118	fdSet := &unix.FdSet{}
119	fdSet.Set(int(reader.Fd()))
120	fdSet.Set(int(abort.Fd()))
121
122	_, err := unix.Select(maxFd+1, fdSet, nil, nil, nil)
123	if err != nil {
124		return fmt.Errorf("select: %w", err)
125	}
126
127	if fdSet.IsSet(abortFd) {
128		return ErrCanceled
129	}
130
131	if fdSet.IsSet(readerFd) {
132		return nil
133	}
134
135	return fmt.Errorf("select returned without setting a file descriptor")
136}