polling.go

  1package openai
  2
  3import (
  4	"context"
  5	"fmt"
  6	"net/http"
  7	"strconv"
  8	"time"
  9
 10	"github.com/openai/openai-go/option"
 11)
 12
 13func mkPollingOptions(pollIntervalMs int) []option.RequestOption {
 14	options := []option.RequestOption{option.WithHeader("X-Stainless-Poll-Helper", "true")}
 15	if pollIntervalMs > 0 {
 16		options = append(options, option.WithHeader("X-Stainless-Poll-Interval", fmt.Sprintf("%d", pollIntervalMs)))
 17	}
 18	return options
 19}
 20
 21func getPollInterval(raw *http.Response) (ms int) {
 22	if ms, err := strconv.Atoi(raw.Header.Get("openai-poll-after-ms")); err == nil {
 23		return ms
 24	}
 25	return 1000
 26}
 27
 28// PollStatus waits until a VectorStoreFile is no longer in an incomplete state and returns it.
 29// Pass 0 as pollIntervalMs to use the default polling interval of 1 second.
 30func (r *VectorStoreFileService) PollStatus(ctx context.Context, vectorStoreID string, fileID string, pollIntervalMs int, opts ...option.RequestOption) (*VectorStoreFile, error) {
 31	var raw *http.Response
 32	opts = append(opts, mkPollingOptions(pollIntervalMs)...)
 33	opts = append(opts, option.WithResponseInto(&raw))
 34	for {
 35		file, err := r.Get(ctx, vectorStoreID, fileID, opts...)
 36		if err != nil {
 37			return nil, fmt.Errorf("vector store file poll: received %w", err)
 38		}
 39
 40		switch file.Status {
 41		case VectorStoreFileStatusInProgress:
 42			if pollIntervalMs <= 0 {
 43				pollIntervalMs = getPollInterval(raw)
 44			}
 45			time.Sleep(time.Duration(pollIntervalMs) * time.Millisecond)
 46		case VectorStoreFileStatusCancelled,
 47			VectorStoreFileStatusCompleted,
 48			VectorStoreFileStatusFailed:
 49			return file, nil
 50		default:
 51			return nil, fmt.Errorf("invalid vector store file status during polling: received %s", file.Status)
 52		}
 53
 54		select {
 55		case <-ctx.Done():
 56			return nil, ctx.Err()
 57		default:
 58
 59		}
 60	}
 61}
 62
 63// PollStatus waits until a BetaVectorStoreFileBatch is no longer in an incomplete state and returns it.
 64// Pass 0 as pollIntervalMs to use the default polling interval of 1 second.
 65func (r *VectorStoreFileBatchService) PollStatus(ctx context.Context, vectorStoreID string, batchID string, pollIntervalMs int, opts ...option.RequestOption) (*VectorStoreFileBatch, error) {
 66	var raw *http.Response
 67	opts = append(opts, option.WithResponseInto(&raw))
 68	opts = append(opts, mkPollingOptions(pollIntervalMs)...)
 69	for {
 70		batch, err := r.Get(ctx, vectorStoreID, batchID, opts...)
 71		if err != nil {
 72			return nil, fmt.Errorf("vector store file batch poll: received %w", err)
 73		}
 74
 75		switch batch.Status {
 76		case VectorStoreFileBatchStatusInProgress:
 77			if pollIntervalMs <= 0 {
 78				pollIntervalMs = getPollInterval(raw)
 79			}
 80			time.Sleep(time.Duration(pollIntervalMs) * time.Millisecond)
 81		case VectorStoreFileBatchStatusCancelled,
 82			VectorStoreFileBatchStatusCompleted,
 83			VectorStoreFileBatchStatusFailed:
 84			return batch, nil
 85		default:
 86			return nil, fmt.Errorf("invalid vector store file batch status during polling: received %s", batch.Status)
 87		}
 88
 89		select {
 90		case <-ctx.Done():
 91			return nil, ctx.Err()
 92		default:
 93		}
 94	}
 95}
 96
 97// PollStatus waits until a Run is no longer in an incomplete state and returns it.
 98// Pass 0 as pollIntervalMs to use the default polling interval of 1 second.
 99func (r *BetaThreadRunService) PollStatus(ctx context.Context, threadID string, runID string, pollIntervalMs int, opts ...option.RequestOption) (res *Run, err error) {
100	var raw *http.Response
101	opts = append(opts, mkPollingOptions(pollIntervalMs)...)
102	opts = append(opts, option.WithResponseInto(&raw))
103	for {
104		run, err := r.Get(ctx, threadID, runID, opts...)
105		if err != nil {
106			return nil, fmt.Errorf("thread run poll: received %w", err)
107		}
108
109		switch run.Status {
110		case RunStatusInProgress,
111			RunStatusQueued:
112			if pollIntervalMs <= 0 {
113				pollIntervalMs = getPollInterval(raw)
114			}
115			time.Sleep(time.Duration(pollIntervalMs) * time.Millisecond)
116		case RunStatusRequiresAction,
117			RunStatusCancelled,
118			RunStatusCompleted,
119			RunStatusFailed,
120			RunStatusExpired,
121			RunStatusIncomplete:
122			return run, nil
123		default:
124			return nil, fmt.Errorf("invalid thread run status during polling: received %s", run.Status)
125		}
126
127		select {
128		case <-ctx.Done():
129			return nil, ctx.Err()
130		default:
131			break
132		}
133	}
134}