meter.go

  1// Copyright The OpenTelemetry Authors
  2// SPDX-License-Identifier: Apache-2.0
  3
  4package global // import "go.opentelemetry.io/otel/internal/global"
  5
  6import (
  7	"container/list"
  8	"context"
  9	"reflect"
 10	"sync"
 11
 12	"go.opentelemetry.io/otel/metric"
 13	"go.opentelemetry.io/otel/metric/embedded"
 14)
 15
 16// meterProvider is a placeholder for a configured SDK MeterProvider.
 17//
 18// All MeterProvider functionality is forwarded to a delegate once
 19// configured.
 20type meterProvider struct {
 21	embedded.MeterProvider
 22
 23	mtx    sync.Mutex
 24	meters map[il]*meter
 25
 26	delegate metric.MeterProvider
 27}
 28
 29// setDelegate configures p to delegate all MeterProvider functionality to
 30// provider.
 31//
 32// All Meters provided prior to this function call are switched out to be
 33// Meters provided by provider. All instruments and callbacks are recreated and
 34// delegated.
 35//
 36// It is guaranteed by the caller that this happens only once.
 37func (p *meterProvider) setDelegate(provider metric.MeterProvider) {
 38	p.mtx.Lock()
 39	defer p.mtx.Unlock()
 40
 41	p.delegate = provider
 42
 43	if len(p.meters) == 0 {
 44		return
 45	}
 46
 47	for _, meter := range p.meters {
 48		meter.setDelegate(provider)
 49	}
 50
 51	p.meters = nil
 52}
 53
 54// Meter implements MeterProvider.
 55func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
 56	p.mtx.Lock()
 57	defer p.mtx.Unlock()
 58
 59	if p.delegate != nil {
 60		return p.delegate.Meter(name, opts...)
 61	}
 62
 63	// At this moment it is guaranteed that no sdk is installed, save the meter in the meters map.
 64
 65	c := metric.NewMeterConfig(opts...)
 66	key := il{
 67		name:    name,
 68		version: c.InstrumentationVersion(),
 69		schema:  c.SchemaURL(),
 70		attrs:   c.InstrumentationAttributes(),
 71	}
 72
 73	if p.meters == nil {
 74		p.meters = make(map[il]*meter)
 75	}
 76
 77	if val, ok := p.meters[key]; ok {
 78		return val
 79	}
 80
 81	t := &meter{name: name, opts: opts, instruments: make(map[instID]delegatedInstrument)}
 82	p.meters[key] = t
 83	return t
 84}
 85
 86// meter is a placeholder for a metric.Meter.
 87//
 88// All Meter functionality is forwarded to a delegate once configured.
 89// Otherwise, all functionality is forwarded to a NoopMeter.
 90type meter struct {
 91	embedded.Meter
 92
 93	name string
 94	opts []metric.MeterOption
 95
 96	mtx         sync.Mutex
 97	instruments map[instID]delegatedInstrument
 98
 99	registry list.List
100
101	delegate metric.Meter
102}
103
104type delegatedInstrument interface {
105	setDelegate(metric.Meter)
106}
107
108// instID are the identifying properties of a instrument.
109type instID struct {
110	// name is the name of the stream.
111	name string
112	// description is the description of the stream.
113	description string
114	// kind defines the functional group of the instrument.
115	kind reflect.Type
116	// unit is the unit of the stream.
117	unit string
118}
119
120// setDelegate configures m to delegate all Meter functionality to Meters
121// created by provider.
122//
123// All subsequent calls to the Meter methods will be passed to the delegate.
124//
125// It is guaranteed by the caller that this happens only once.
126func (m *meter) setDelegate(provider metric.MeterProvider) {
127	m.mtx.Lock()
128	defer m.mtx.Unlock()
129
130	meter := provider.Meter(m.name, m.opts...)
131	m.delegate = meter
132
133	for _, inst := range m.instruments {
134		inst.setDelegate(meter)
135	}
136
137	var n *list.Element
138	for e := m.registry.Front(); e != nil; e = n {
139		r := e.Value.(*registration)
140		r.setDelegate(meter)
141		n = e.Next()
142		m.registry.Remove(e)
143	}
144
145	m.instruments = nil
146	m.registry.Init()
147}
148
149func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
150	m.mtx.Lock()
151	defer m.mtx.Unlock()
152
153	if m.delegate != nil {
154		return m.delegate.Int64Counter(name, options...)
155	}
156
157	cfg := metric.NewInt64CounterConfig(options...)
158	id := instID{
159		name:        name,
160		kind:        reflect.TypeOf((*siCounter)(nil)),
161		description: cfg.Description(),
162		unit:        cfg.Unit(),
163	}
164	if f, ok := m.instruments[id]; ok {
165		return f.(metric.Int64Counter), nil
166	}
167	i := &siCounter{name: name, opts: options}
168	m.instruments[id] = i
169	return i, nil
170}
171
172func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
173	m.mtx.Lock()
174	defer m.mtx.Unlock()
175
176	if m.delegate != nil {
177		return m.delegate.Int64UpDownCounter(name, options...)
178	}
179
180	cfg := metric.NewInt64UpDownCounterConfig(options...)
181	id := instID{
182		name:        name,
183		kind:        reflect.TypeOf((*siUpDownCounter)(nil)),
184		description: cfg.Description(),
185		unit:        cfg.Unit(),
186	}
187	if f, ok := m.instruments[id]; ok {
188		return f.(metric.Int64UpDownCounter), nil
189	}
190	i := &siUpDownCounter{name: name, opts: options}
191	m.instruments[id] = i
192	return i, nil
193}
194
195func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
196	m.mtx.Lock()
197	defer m.mtx.Unlock()
198
199	if m.delegate != nil {
200		return m.delegate.Int64Histogram(name, options...)
201	}
202
203	cfg := metric.NewInt64HistogramConfig(options...)
204	id := instID{
205		name:        name,
206		kind:        reflect.TypeOf((*siHistogram)(nil)),
207		description: cfg.Description(),
208		unit:        cfg.Unit(),
209	}
210	if f, ok := m.instruments[id]; ok {
211		return f.(metric.Int64Histogram), nil
212	}
213	i := &siHistogram{name: name, opts: options}
214	m.instruments[id] = i
215	return i, nil
216}
217
218func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
219	m.mtx.Lock()
220	defer m.mtx.Unlock()
221
222	if m.delegate != nil {
223		return m.delegate.Int64Gauge(name, options...)
224	}
225
226	cfg := metric.NewInt64GaugeConfig(options...)
227	id := instID{
228		name:        name,
229		kind:        reflect.TypeOf((*siGauge)(nil)),
230		description: cfg.Description(),
231		unit:        cfg.Unit(),
232	}
233	if f, ok := m.instruments[id]; ok {
234		return f.(metric.Int64Gauge), nil
235	}
236	i := &siGauge{name: name, opts: options}
237	m.instruments[id] = i
238	return i, nil
239}
240
241func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
242	m.mtx.Lock()
243	defer m.mtx.Unlock()
244
245	if m.delegate != nil {
246		return m.delegate.Int64ObservableCounter(name, options...)
247	}
248
249	cfg := metric.NewInt64ObservableCounterConfig(options...)
250	id := instID{
251		name:        name,
252		kind:        reflect.TypeOf((*aiCounter)(nil)),
253		description: cfg.Description(),
254		unit:        cfg.Unit(),
255	}
256	if f, ok := m.instruments[id]; ok {
257		return f.(metric.Int64ObservableCounter), nil
258	}
259	i := &aiCounter{name: name, opts: options}
260	m.instruments[id] = i
261	return i, nil
262}
263
264func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
265	m.mtx.Lock()
266	defer m.mtx.Unlock()
267
268	if m.delegate != nil {
269		return m.delegate.Int64ObservableUpDownCounter(name, options...)
270	}
271
272	cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
273	id := instID{
274		name:        name,
275		kind:        reflect.TypeOf((*aiUpDownCounter)(nil)),
276		description: cfg.Description(),
277		unit:        cfg.Unit(),
278	}
279	if f, ok := m.instruments[id]; ok {
280		return f.(metric.Int64ObservableUpDownCounter), nil
281	}
282	i := &aiUpDownCounter{name: name, opts: options}
283	m.instruments[id] = i
284	return i, nil
285}
286
287func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
288	m.mtx.Lock()
289	defer m.mtx.Unlock()
290
291	if m.delegate != nil {
292		return m.delegate.Int64ObservableGauge(name, options...)
293	}
294
295	cfg := metric.NewInt64ObservableGaugeConfig(options...)
296	id := instID{
297		name:        name,
298		kind:        reflect.TypeOf((*aiGauge)(nil)),
299		description: cfg.Description(),
300		unit:        cfg.Unit(),
301	}
302	if f, ok := m.instruments[id]; ok {
303		return f.(metric.Int64ObservableGauge), nil
304	}
305	i := &aiGauge{name: name, opts: options}
306	m.instruments[id] = i
307	return i, nil
308}
309
310func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
311	m.mtx.Lock()
312	defer m.mtx.Unlock()
313
314	if m.delegate != nil {
315		return m.delegate.Float64Counter(name, options...)
316	}
317
318	cfg := metric.NewFloat64CounterConfig(options...)
319	id := instID{
320		name:        name,
321		kind:        reflect.TypeOf((*sfCounter)(nil)),
322		description: cfg.Description(),
323		unit:        cfg.Unit(),
324	}
325	if f, ok := m.instruments[id]; ok {
326		return f.(metric.Float64Counter), nil
327	}
328	i := &sfCounter{name: name, opts: options}
329	m.instruments[id] = i
330	return i, nil
331}
332
333func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
334	m.mtx.Lock()
335	defer m.mtx.Unlock()
336
337	if m.delegate != nil {
338		return m.delegate.Float64UpDownCounter(name, options...)
339	}
340
341	cfg := metric.NewFloat64UpDownCounterConfig(options...)
342	id := instID{
343		name:        name,
344		kind:        reflect.TypeOf((*sfUpDownCounter)(nil)),
345		description: cfg.Description(),
346		unit:        cfg.Unit(),
347	}
348	if f, ok := m.instruments[id]; ok {
349		return f.(metric.Float64UpDownCounter), nil
350	}
351	i := &sfUpDownCounter{name: name, opts: options}
352	m.instruments[id] = i
353	return i, nil
354}
355
356func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
357	m.mtx.Lock()
358	defer m.mtx.Unlock()
359
360	if m.delegate != nil {
361		return m.delegate.Float64Histogram(name, options...)
362	}
363
364	cfg := metric.NewFloat64HistogramConfig(options...)
365	id := instID{
366		name:        name,
367		kind:        reflect.TypeOf((*sfHistogram)(nil)),
368		description: cfg.Description(),
369		unit:        cfg.Unit(),
370	}
371	if f, ok := m.instruments[id]; ok {
372		return f.(metric.Float64Histogram), nil
373	}
374	i := &sfHistogram{name: name, opts: options}
375	m.instruments[id] = i
376	return i, nil
377}
378
379func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
380	m.mtx.Lock()
381	defer m.mtx.Unlock()
382
383	if m.delegate != nil {
384		return m.delegate.Float64Gauge(name, options...)
385	}
386
387	cfg := metric.NewFloat64GaugeConfig(options...)
388	id := instID{
389		name:        name,
390		kind:        reflect.TypeOf((*sfGauge)(nil)),
391		description: cfg.Description(),
392		unit:        cfg.Unit(),
393	}
394	if f, ok := m.instruments[id]; ok {
395		return f.(metric.Float64Gauge), nil
396	}
397	i := &sfGauge{name: name, opts: options}
398	m.instruments[id] = i
399	return i, nil
400}
401
402func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
403	m.mtx.Lock()
404	defer m.mtx.Unlock()
405
406	if m.delegate != nil {
407		return m.delegate.Float64ObservableCounter(name, options...)
408	}
409
410	cfg := metric.NewFloat64ObservableCounterConfig(options...)
411	id := instID{
412		name:        name,
413		kind:        reflect.TypeOf((*afCounter)(nil)),
414		description: cfg.Description(),
415		unit:        cfg.Unit(),
416	}
417	if f, ok := m.instruments[id]; ok {
418		return f.(metric.Float64ObservableCounter), nil
419	}
420	i := &afCounter{name: name, opts: options}
421	m.instruments[id] = i
422	return i, nil
423}
424
425func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
426	m.mtx.Lock()
427	defer m.mtx.Unlock()
428
429	if m.delegate != nil {
430		return m.delegate.Float64ObservableUpDownCounter(name, options...)
431	}
432
433	cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
434	id := instID{
435		name:        name,
436		kind:        reflect.TypeOf((*afUpDownCounter)(nil)),
437		description: cfg.Description(),
438		unit:        cfg.Unit(),
439	}
440	if f, ok := m.instruments[id]; ok {
441		return f.(metric.Float64ObservableUpDownCounter), nil
442	}
443	i := &afUpDownCounter{name: name, opts: options}
444	m.instruments[id] = i
445	return i, nil
446}
447
448func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
449	m.mtx.Lock()
450	defer m.mtx.Unlock()
451
452	if m.delegate != nil {
453		return m.delegate.Float64ObservableGauge(name, options...)
454	}
455
456	cfg := metric.NewFloat64ObservableGaugeConfig(options...)
457	id := instID{
458		name:        name,
459		kind:        reflect.TypeOf((*afGauge)(nil)),
460		description: cfg.Description(),
461		unit:        cfg.Unit(),
462	}
463	if f, ok := m.instruments[id]; ok {
464		return f.(metric.Float64ObservableGauge), nil
465	}
466	i := &afGauge{name: name, opts: options}
467	m.instruments[id] = i
468	return i, nil
469}
470
471// RegisterCallback captures the function that will be called during Collect.
472func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
473	m.mtx.Lock()
474	defer m.mtx.Unlock()
475
476	if m.delegate != nil {
477		return m.delegate.RegisterCallback(unwrapCallback(f), unwrapInstruments(insts)...)
478	}
479
480	reg := &registration{instruments: insts, function: f}
481	e := m.registry.PushBack(reg)
482	reg.unreg = func() error {
483		m.mtx.Lock()
484		_ = m.registry.Remove(e)
485		m.mtx.Unlock()
486		return nil
487	}
488	return reg, nil
489}
490
491func unwrapInstruments(instruments []metric.Observable) []metric.Observable {
492	out := make([]metric.Observable, 0, len(instruments))
493
494	for _, inst := range instruments {
495		if in, ok := inst.(unwrapper); ok {
496			out = append(out, in.unwrap())
497		} else {
498			out = append(out, inst)
499		}
500	}
501
502	return out
503}
504
505type registration struct {
506	embedded.Registration
507
508	instruments []metric.Observable
509	function    metric.Callback
510
511	unreg   func() error
512	unregMu sync.Mutex
513}
514
515type unwrapObs struct {
516	embedded.Observer
517	obs metric.Observer
518}
519
520// unwrapFloat64Observable returns an expected metric.Float64Observable after
521// unwrapping the global object.
522func unwrapFloat64Observable(inst metric.Float64Observable) metric.Float64Observable {
523	if unwrapped, ok := inst.(unwrapper); ok {
524		if floatObs, ok := unwrapped.unwrap().(metric.Float64Observable); ok {
525			// Note: if the unwrapped object does not
526			// unwrap as an observable for either of the
527			// predicates here, it means an internal bug in
528			// this package.  We avoid logging an error in
529			// this case, because the SDK has to try its
530			// own type conversion on the object.  The SDK
531			// will see this and be forced to respond with
532			// its own error.
533			//
534			// This code uses a double-nested if statement
535			// to avoid creating a branch that is
536			// impossible to cover.
537			inst = floatObs
538		}
539	}
540	return inst
541}
542
543// unwrapInt64Observable returns an expected metric.Int64Observable after
544// unwrapping the global object.
545func unwrapInt64Observable(inst metric.Int64Observable) metric.Int64Observable {
546	if unwrapped, ok := inst.(unwrapper); ok {
547		if unint, ok := unwrapped.unwrap().(metric.Int64Observable); ok {
548			// See the comment in unwrapFloat64Observable().
549			inst = unint
550		}
551	}
552	return inst
553}
554
555func (uo *unwrapObs) ObserveFloat64(inst metric.Float64Observable, value float64, opts ...metric.ObserveOption) {
556	uo.obs.ObserveFloat64(unwrapFloat64Observable(inst), value, opts...)
557}
558
559func (uo *unwrapObs) ObserveInt64(inst metric.Int64Observable, value int64, opts ...metric.ObserveOption) {
560	uo.obs.ObserveInt64(unwrapInt64Observable(inst), value, opts...)
561}
562
563func unwrapCallback(f metric.Callback) metric.Callback {
564	return func(ctx context.Context, obs metric.Observer) error {
565		return f(ctx, &unwrapObs{obs: obs})
566	}
567}
568
569func (c *registration) setDelegate(m metric.Meter) {
570	c.unregMu.Lock()
571	defer c.unregMu.Unlock()
572
573	if c.unreg == nil {
574		// Unregister already called.
575		return
576	}
577
578	reg, err := m.RegisterCallback(unwrapCallback(c.function), unwrapInstruments(c.instruments)...)
579	if err != nil {
580		GetErrorHandler().Handle(err)
581		return
582	}
583
584	c.unreg = reg.Unregister
585}
586
587func (c *registration) Unregister() error {
588	c.unregMu.Lock()
589	defer c.unregMu.Unlock()
590	if c.unreg == nil {
591		// Unregister already called.
592		return nil
593	}
594
595	var err error
596	err, c.unreg = c.unreg(), nil
597	return err
598}