1// Copyright The OpenTelemetry Authors
2// SPDX-License-Identifier: Apache-2.0
3
4package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
5
6import (
7 "context"
8 "io"
9 "net/http"
10 "net/http/httptrace"
11 "sync/atomic"
12 "time"
13
14 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request"
15 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv"
16 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil"
17 "go.opentelemetry.io/otel"
18 "go.opentelemetry.io/otel/attribute"
19 "go.opentelemetry.io/otel/codes"
20 "go.opentelemetry.io/otel/metric"
21 "go.opentelemetry.io/otel/propagation"
22
23 "go.opentelemetry.io/otel/trace"
24)
25
26// Transport implements the http.RoundTripper interface and wraps
27// outbound HTTP(S) requests with a span and enriches it with metrics.
28type Transport struct {
29 rt http.RoundTripper
30
31 tracer trace.Tracer
32 meter metric.Meter
33 propagators propagation.TextMapPropagator
34 spanStartOptions []trace.SpanStartOption
35 filters []Filter
36 spanNameFormatter func(string, *http.Request) string
37 clientTrace func(context.Context) *httptrace.ClientTrace
38 metricAttributesFn func(*http.Request) []attribute.KeyValue
39
40 semconv semconv.HTTPClient
41 requestBytesCounter metric.Int64Counter
42 responseBytesCounter metric.Int64Counter
43 latencyMeasure metric.Float64Histogram
44}
45
46var _ http.RoundTripper = &Transport{}
47
48// NewTransport wraps the provided http.RoundTripper with one that
49// starts a span, injects the span context into the outbound request headers,
50// and enriches it with metrics.
51//
52// If the provided http.RoundTripper is nil, http.DefaultTransport will be used
53// as the base http.RoundTripper.
54func NewTransport(base http.RoundTripper, opts ...Option) *Transport {
55 if base == nil {
56 base = http.DefaultTransport
57 }
58
59 t := Transport{
60 rt: base,
61 semconv: semconv.NewHTTPClient(),
62 }
63
64 defaultOpts := []Option{
65 WithSpanOptions(trace.WithSpanKind(trace.SpanKindClient)),
66 WithSpanNameFormatter(defaultTransportFormatter),
67 }
68
69 c := newConfig(append(defaultOpts, opts...)...)
70 t.applyConfig(c)
71 t.createMeasures()
72
73 return &t
74}
75
76func (t *Transport) applyConfig(c *config) {
77 t.tracer = c.Tracer
78 t.meter = c.Meter
79 t.propagators = c.Propagators
80 t.spanStartOptions = c.SpanStartOptions
81 t.filters = c.Filters
82 t.spanNameFormatter = c.SpanNameFormatter
83 t.clientTrace = c.ClientTrace
84 t.metricAttributesFn = c.MetricAttributesFn
85}
86
87func (t *Transport) createMeasures() {
88 var err error
89 t.requestBytesCounter, err = t.meter.Int64Counter(
90 clientRequestSize,
91 metric.WithUnit("By"),
92 metric.WithDescription("Measures the size of HTTP request messages."),
93 )
94 handleErr(err)
95
96 t.responseBytesCounter, err = t.meter.Int64Counter(
97 clientResponseSize,
98 metric.WithUnit("By"),
99 metric.WithDescription("Measures the size of HTTP response messages."),
100 )
101 handleErr(err)
102
103 t.latencyMeasure, err = t.meter.Float64Histogram(
104 clientDuration,
105 metric.WithUnit("ms"),
106 metric.WithDescription("Measures the duration of outbound HTTP requests."),
107 )
108 handleErr(err)
109}
110
111func defaultTransportFormatter(_ string, r *http.Request) string {
112 return "HTTP " + r.Method
113}
114
115// RoundTrip creates a Span and propagates its context via the provided request's headers
116// before handing the request to the configured base RoundTripper. The created span will
117// end when the response body is closed or when a read from the body returns io.EOF.
118func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
119 requestStartTime := time.Now()
120 for _, f := range t.filters {
121 if !f(r) {
122 // Simply pass through to the base RoundTripper if a filter rejects the request
123 return t.rt.RoundTrip(r)
124 }
125 }
126
127 tracer := t.tracer
128
129 if tracer == nil {
130 if span := trace.SpanFromContext(r.Context()); span.SpanContext().IsValid() {
131 tracer = newTracer(span.TracerProvider())
132 } else {
133 tracer = newTracer(otel.GetTracerProvider())
134 }
135 }
136
137 opts := append([]trace.SpanStartOption{}, t.spanStartOptions...) // start with the configured options
138
139 ctx, span := tracer.Start(r.Context(), t.spanNameFormatter("", r), opts...)
140
141 if t.clientTrace != nil {
142 ctx = httptrace.WithClientTrace(ctx, t.clientTrace(ctx))
143 }
144
145 labeler, found := LabelerFromContext(ctx)
146 if !found {
147 ctx = ContextWithLabeler(ctx, labeler)
148 }
149
150 r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request.
151
152 // if request body is nil or NoBody, we don't want to mutate the body as it
153 // will affect the identity of it in an unforeseeable way because we assert
154 // ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
155 bw := request.NewBodyWrapper(r.Body, func(int64) {})
156 if r.Body != nil && r.Body != http.NoBody {
157 r.Body = bw
158 }
159
160 span.SetAttributes(t.semconv.RequestTraceAttrs(r)...)
161 t.propagators.Inject(ctx, propagation.HeaderCarrier(r.Header))
162
163 res, err := t.rt.RoundTrip(r)
164 if err != nil {
165 // set error type attribute if the error is part of the predefined
166 // error types.
167 // otherwise, record it as an exception
168 if errType := t.semconv.ErrorType(err); errType.Valid() {
169 span.SetAttributes(errType)
170 } else {
171 span.RecordError(err)
172 }
173
174 span.SetStatus(codes.Error, err.Error())
175 span.End()
176 return res, err
177 }
178
179 // metrics
180 metricAttrs := append(append(labeler.Get(), semconvutil.HTTPClientRequestMetrics(r)...), t.metricAttributesFromRequest(r)...)
181 if res.StatusCode > 0 {
182 metricAttrs = append(metricAttrs, semconv.HTTPStatusCode(res.StatusCode))
183 }
184 o := metric.WithAttributeSet(attribute.NewSet(metricAttrs...))
185
186 t.requestBytesCounter.Add(ctx, bw.BytesRead(), o)
187 // For handling response bytes we leverage a callback when the client reads the http response
188 readRecordFunc := func(n int64) {
189 t.responseBytesCounter.Add(ctx, n, o)
190 }
191
192 // traces
193 span.SetAttributes(t.semconv.ResponseTraceAttrs(res)...)
194 span.SetStatus(t.semconv.Status(res.StatusCode))
195
196 res.Body = newWrappedBody(span, readRecordFunc, res.Body)
197
198 // Use floating point division here for higher precision (instead of Millisecond method).
199 elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
200
201 t.latencyMeasure.Record(ctx, elapsedTime, o)
202
203 return res, err
204}
205
206func (t *Transport) metricAttributesFromRequest(r *http.Request) []attribute.KeyValue {
207 var attributeForRequest []attribute.KeyValue
208 if t.metricAttributesFn != nil {
209 attributeForRequest = t.metricAttributesFn(r)
210 }
211 return attributeForRequest
212}
213
214// newWrappedBody returns a new and appropriately scoped *wrappedBody as an
215// io.ReadCloser. If the passed body implements io.Writer, the returned value
216// will implement io.ReadWriteCloser.
217func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
218 // The successful protocol switch responses will have a body that
219 // implement an io.ReadWriteCloser. Ensure this interface type continues
220 // to be satisfied if that is the case.
221 if _, ok := body.(io.ReadWriteCloser); ok {
222 return &wrappedBody{span: span, record: record, body: body}
223 }
224
225 // Remove the implementation of the io.ReadWriteCloser and only implement
226 // the io.ReadCloser.
227 return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
228}
229
230// wrappedBody is the response body type returned by the transport
231// instrumentation to complete a span. Errors encountered when using the
232// response body are recorded in span tracking the response.
233//
234// The span tracking the response is ended when this body is closed.
235//
236// If the response body implements the io.Writer interface (i.e. for
237// successful protocol switches), the wrapped body also will.
238type wrappedBody struct {
239 span trace.Span
240 recorded atomic.Bool
241 record func(n int64)
242 body io.ReadCloser
243 read atomic.Int64
244}
245
246var _ io.ReadWriteCloser = &wrappedBody{}
247
248func (wb *wrappedBody) Write(p []byte) (int, error) {
249 // This will not panic given the guard in newWrappedBody.
250 n, err := wb.body.(io.Writer).Write(p)
251 if err != nil {
252 wb.span.RecordError(err)
253 wb.span.SetStatus(codes.Error, err.Error())
254 }
255 return n, err
256}
257
258func (wb *wrappedBody) Read(b []byte) (int, error) {
259 n, err := wb.body.Read(b)
260 // Record the number of bytes read
261 wb.read.Add(int64(n))
262
263 switch err {
264 case nil:
265 // nothing to do here but fall through to the return
266 case io.EOF:
267 wb.recordBytesRead()
268 wb.span.End()
269 default:
270 wb.span.RecordError(err)
271 wb.span.SetStatus(codes.Error, err.Error())
272 }
273 return n, err
274}
275
276// recordBytesRead is a function that ensures the number of bytes read is recorded once and only once.
277func (wb *wrappedBody) recordBytesRead() {
278 // note: it is more performant (and equally correct) to use atomic.Bool over sync.Once here. In the event that
279 // two goroutines are racing to call this method, the number of bytes read will no longer increase. Using
280 // CompareAndSwap allows later goroutines to return quickly and not block waiting for the race winner to finish
281 // calling wb.record(wb.read.Load()).
282 if wb.recorded.CompareAndSwap(false, true) {
283 // Record the total number of bytes read
284 wb.record(wb.read.Load())
285 }
286}
287
288func (wb *wrappedBody) Close() error {
289 wb.recordBytesRead()
290 wb.span.End()
291 if wb.body != nil {
292 return wb.body.Close()
293 }
294 return nil
295}