transport.go

  1// Copyright The OpenTelemetry Authors
  2// SPDX-License-Identifier: Apache-2.0
  3
  4package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  5
  6import (
  7	"context"
  8	"io"
  9	"net/http"
 10	"net/http/httptrace"
 11	"sync/atomic"
 12	"time"
 13
 14	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request"
 15	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv"
 16	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil"
 17	"go.opentelemetry.io/otel"
 18	"go.opentelemetry.io/otel/attribute"
 19	"go.opentelemetry.io/otel/codes"
 20	"go.opentelemetry.io/otel/metric"
 21	"go.opentelemetry.io/otel/propagation"
 22
 23	"go.opentelemetry.io/otel/trace"
 24)
 25
 26// Transport implements the http.RoundTripper interface and wraps
 27// outbound HTTP(S) requests with a span and enriches it with metrics.
 28type Transport struct {
 29	rt http.RoundTripper
 30
 31	tracer             trace.Tracer
 32	meter              metric.Meter
 33	propagators        propagation.TextMapPropagator
 34	spanStartOptions   []trace.SpanStartOption
 35	filters            []Filter
 36	spanNameFormatter  func(string, *http.Request) string
 37	clientTrace        func(context.Context) *httptrace.ClientTrace
 38	metricAttributesFn func(*http.Request) []attribute.KeyValue
 39
 40	semconv              semconv.HTTPClient
 41	requestBytesCounter  metric.Int64Counter
 42	responseBytesCounter metric.Int64Counter
 43	latencyMeasure       metric.Float64Histogram
 44}
 45
 46var _ http.RoundTripper = &Transport{}
 47
 48// NewTransport wraps the provided http.RoundTripper with one that
 49// starts a span, injects the span context into the outbound request headers,
 50// and enriches it with metrics.
 51//
 52// If the provided http.RoundTripper is nil, http.DefaultTransport will be used
 53// as the base http.RoundTripper.
 54func NewTransport(base http.RoundTripper, opts ...Option) *Transport {
 55	if base == nil {
 56		base = http.DefaultTransport
 57	}
 58
 59	t := Transport{
 60		rt:      base,
 61		semconv: semconv.NewHTTPClient(),
 62	}
 63
 64	defaultOpts := []Option{
 65		WithSpanOptions(trace.WithSpanKind(trace.SpanKindClient)),
 66		WithSpanNameFormatter(defaultTransportFormatter),
 67	}
 68
 69	c := newConfig(append(defaultOpts, opts...)...)
 70	t.applyConfig(c)
 71	t.createMeasures()
 72
 73	return &t
 74}
 75
 76func (t *Transport) applyConfig(c *config) {
 77	t.tracer = c.Tracer
 78	t.meter = c.Meter
 79	t.propagators = c.Propagators
 80	t.spanStartOptions = c.SpanStartOptions
 81	t.filters = c.Filters
 82	t.spanNameFormatter = c.SpanNameFormatter
 83	t.clientTrace = c.ClientTrace
 84	t.metricAttributesFn = c.MetricAttributesFn
 85}
 86
 87func (t *Transport) createMeasures() {
 88	var err error
 89	t.requestBytesCounter, err = t.meter.Int64Counter(
 90		clientRequestSize,
 91		metric.WithUnit("By"),
 92		metric.WithDescription("Measures the size of HTTP request messages."),
 93	)
 94	handleErr(err)
 95
 96	t.responseBytesCounter, err = t.meter.Int64Counter(
 97		clientResponseSize,
 98		metric.WithUnit("By"),
 99		metric.WithDescription("Measures the size of HTTP response messages."),
100	)
101	handleErr(err)
102
103	t.latencyMeasure, err = t.meter.Float64Histogram(
104		clientDuration,
105		metric.WithUnit("ms"),
106		metric.WithDescription("Measures the duration of outbound HTTP requests."),
107	)
108	handleErr(err)
109}
110
111func defaultTransportFormatter(_ string, r *http.Request) string {
112	return "HTTP " + r.Method
113}
114
115// RoundTrip creates a Span and propagates its context via the provided request's headers
116// before handing the request to the configured base RoundTripper. The created span will
117// end when the response body is closed or when a read from the body returns io.EOF.
118func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
119	requestStartTime := time.Now()
120	for _, f := range t.filters {
121		if !f(r) {
122			// Simply pass through to the base RoundTripper if a filter rejects the request
123			return t.rt.RoundTrip(r)
124		}
125	}
126
127	tracer := t.tracer
128
129	if tracer == nil {
130		if span := trace.SpanFromContext(r.Context()); span.SpanContext().IsValid() {
131			tracer = newTracer(span.TracerProvider())
132		} else {
133			tracer = newTracer(otel.GetTracerProvider())
134		}
135	}
136
137	opts := append([]trace.SpanStartOption{}, t.spanStartOptions...) // start with the configured options
138
139	ctx, span := tracer.Start(r.Context(), t.spanNameFormatter("", r), opts...)
140
141	if t.clientTrace != nil {
142		ctx = httptrace.WithClientTrace(ctx, t.clientTrace(ctx))
143	}
144
145	labeler, found := LabelerFromContext(ctx)
146	if !found {
147		ctx = ContextWithLabeler(ctx, labeler)
148	}
149
150	r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request.
151
152	// if request body is nil or NoBody, we don't want to mutate the body as it
153	// will affect the identity of it in an unforeseeable way because we assert
154	// ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
155	bw := request.NewBodyWrapper(r.Body, func(int64) {})
156	if r.Body != nil && r.Body != http.NoBody {
157		r.Body = bw
158	}
159
160	span.SetAttributes(t.semconv.RequestTraceAttrs(r)...)
161	t.propagators.Inject(ctx, propagation.HeaderCarrier(r.Header))
162
163	res, err := t.rt.RoundTrip(r)
164	if err != nil {
165		// set error type attribute if the error is part of the predefined
166		// error types.
167		// otherwise, record it as an exception
168		if errType := t.semconv.ErrorType(err); errType.Valid() {
169			span.SetAttributes(errType)
170		} else {
171			span.RecordError(err)
172		}
173
174		span.SetStatus(codes.Error, err.Error())
175		span.End()
176		return res, err
177	}
178
179	// metrics
180	metricAttrs := append(append(labeler.Get(), semconvutil.HTTPClientRequestMetrics(r)...), t.metricAttributesFromRequest(r)...)
181	if res.StatusCode > 0 {
182		metricAttrs = append(metricAttrs, semconv.HTTPStatusCode(res.StatusCode))
183	}
184	o := metric.WithAttributeSet(attribute.NewSet(metricAttrs...))
185
186	t.requestBytesCounter.Add(ctx, bw.BytesRead(), o)
187	// For handling response bytes we leverage a callback when the client reads the http response
188	readRecordFunc := func(n int64) {
189		t.responseBytesCounter.Add(ctx, n, o)
190	}
191
192	// traces
193	span.SetAttributes(t.semconv.ResponseTraceAttrs(res)...)
194	span.SetStatus(t.semconv.Status(res.StatusCode))
195
196	res.Body = newWrappedBody(span, readRecordFunc, res.Body)
197
198	// Use floating point division here for higher precision (instead of Millisecond method).
199	elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
200
201	t.latencyMeasure.Record(ctx, elapsedTime, o)
202
203	return res, err
204}
205
206func (t *Transport) metricAttributesFromRequest(r *http.Request) []attribute.KeyValue {
207	var attributeForRequest []attribute.KeyValue
208	if t.metricAttributesFn != nil {
209		attributeForRequest = t.metricAttributesFn(r)
210	}
211	return attributeForRequest
212}
213
214// newWrappedBody returns a new and appropriately scoped *wrappedBody as an
215// io.ReadCloser. If the passed body implements io.Writer, the returned value
216// will implement io.ReadWriteCloser.
217func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
218	// The successful protocol switch responses will have a body that
219	// implement an io.ReadWriteCloser. Ensure this interface type continues
220	// to be satisfied if that is the case.
221	if _, ok := body.(io.ReadWriteCloser); ok {
222		return &wrappedBody{span: span, record: record, body: body}
223	}
224
225	// Remove the implementation of the io.ReadWriteCloser and only implement
226	// the io.ReadCloser.
227	return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
228}
229
230// wrappedBody is the response body type returned by the transport
231// instrumentation to complete a span. Errors encountered when using the
232// response body are recorded in span tracking the response.
233//
234// The span tracking the response is ended when this body is closed.
235//
236// If the response body implements the io.Writer interface (i.e. for
237// successful protocol switches), the wrapped body also will.
238type wrappedBody struct {
239	span     trace.Span
240	recorded atomic.Bool
241	record   func(n int64)
242	body     io.ReadCloser
243	read     atomic.Int64
244}
245
246var _ io.ReadWriteCloser = &wrappedBody{}
247
248func (wb *wrappedBody) Write(p []byte) (int, error) {
249	// This will not panic given the guard in newWrappedBody.
250	n, err := wb.body.(io.Writer).Write(p)
251	if err != nil {
252		wb.span.RecordError(err)
253		wb.span.SetStatus(codes.Error, err.Error())
254	}
255	return n, err
256}
257
258func (wb *wrappedBody) Read(b []byte) (int, error) {
259	n, err := wb.body.Read(b)
260	// Record the number of bytes read
261	wb.read.Add(int64(n))
262
263	switch err {
264	case nil:
265		// nothing to do here but fall through to the return
266	case io.EOF:
267		wb.recordBytesRead()
268		wb.span.End()
269	default:
270		wb.span.RecordError(err)
271		wb.span.SetStatus(codes.Error, err.Error())
272	}
273	return n, err
274}
275
276// recordBytesRead is a function that ensures the number of bytes read is recorded once and only once.
277func (wb *wrappedBody) recordBytesRead() {
278	// note: it is more performant (and equally correct) to use atomic.Bool over sync.Once here. In the event that
279	// two goroutines are racing to call this method, the number of bytes read will no longer increase. Using
280	// CompareAndSwap allows later goroutines to return quickly and not block waiting for the race winner to finish
281	// calling wb.record(wb.read.Load()).
282	if wb.recorded.CompareAndSwap(false, true) {
283		// Record the total number of bytes read
284		wb.record(wb.read.Load())
285	}
286}
287
288func (wb *wrappedBody) Close() error {
289	wb.recordBytesRead()
290	wb.span.End()
291	if wb.body != nil {
292		return wb.body.Close()
293	}
294	return nil
295}