resolver_wrapper.go

  1/*
  2 *
  3 * Copyright 2017 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package grpc
 20
 21import (
 22	"context"
 23	"strings"
 24	"sync"
 25
 26	"google.golang.org/grpc/internal/channelz"
 27	"google.golang.org/grpc/internal/grpcsync"
 28	"google.golang.org/grpc/internal/pretty"
 29	"google.golang.org/grpc/internal/resolver/delegatingresolver"
 30	"google.golang.org/grpc/resolver"
 31	"google.golang.org/grpc/serviceconfig"
 32)
 33
 34// ccResolverWrapper is a wrapper on top of cc for resolvers.
 35// It implements resolver.ClientConn interface.
 36type ccResolverWrapper struct {
 37	// The following fields are initialized when the wrapper is created and are
 38	// read-only afterwards, and therefore can be accessed without a mutex.
 39	cc                  *ClientConn
 40	ignoreServiceConfig bool
 41	serializer          *grpcsync.CallbackSerializer
 42	serializerCancel    context.CancelFunc
 43
 44	resolver resolver.Resolver // only accessed within the serializer
 45
 46	// The following fields are protected by mu.  Caller must take cc.mu before
 47	// taking mu.
 48	mu       sync.Mutex
 49	curState resolver.State
 50	closed   bool
 51}
 52
 53// newCCResolverWrapper initializes the ccResolverWrapper.  It can only be used
 54// after calling start, which builds the resolver.
 55func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
 56	ctx, cancel := context.WithCancel(cc.ctx)
 57	return &ccResolverWrapper{
 58		cc:                  cc,
 59		ignoreServiceConfig: cc.dopts.disableServiceConfig,
 60		serializer:          grpcsync.NewCallbackSerializer(ctx),
 61		serializerCancel:    cancel,
 62	}
 63}
 64
 65// start builds the name resolver using the resolver.Builder in cc and returns
 66// any error encountered.  It must always be the first operation performed on
 67// any newly created ccResolverWrapper, except that close may be called instead.
 68func (ccr *ccResolverWrapper) start() error {
 69	errCh := make(chan error)
 70	ccr.serializer.TrySchedule(func(ctx context.Context) {
 71		if ctx.Err() != nil {
 72			return
 73		}
 74		opts := resolver.BuildOptions{
 75			DisableServiceConfig: ccr.cc.dopts.disableServiceConfig,
 76			DialCreds:            ccr.cc.dopts.copts.TransportCredentials,
 77			CredsBundle:          ccr.cc.dopts.copts.CredsBundle,
 78			Dialer:               ccr.cc.dopts.copts.Dialer,
 79			Authority:            ccr.cc.authority,
 80			MetricsRecorder:      ccr.cc.metricsRecorderList,
 81		}
 82		var err error
 83		// The delegating resolver is used unless:
 84		//   - A custom dialer is provided via WithContextDialer dialoption or
 85		//   - Proxy usage is disabled through WithNoProxy dialoption.
 86		// In these cases, the resolver is built based on the scheme of target,
 87		// using the appropriate resolver builder.
 88		if ccr.cc.dopts.copts.Dialer != nil || !ccr.cc.dopts.useProxy {
 89			ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
 90		} else {
 91			ccr.resolver, err = delegatingresolver.New(ccr.cc.parsedTarget, ccr, opts, ccr.cc.resolverBuilder, ccr.cc.dopts.enableLocalDNSResolution)
 92		}
 93		errCh <- err
 94	})
 95	return <-errCh
 96}
 97
 98func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
 99	ccr.serializer.TrySchedule(func(ctx context.Context) {
100		if ctx.Err() != nil || ccr.resolver == nil {
101			return
102		}
103		ccr.resolver.ResolveNow(o)
104	})
105}
106
107// close initiates async shutdown of the wrapper.  To determine the wrapper has
108// finished shutting down, the channel should block on ccr.serializer.Done()
109// without cc.mu held.
110func (ccr *ccResolverWrapper) close() {
111	channelz.Info(logger, ccr.cc.channelz, "Closing the name resolver")
112	ccr.mu.Lock()
113	ccr.closed = true
114	ccr.mu.Unlock()
115
116	ccr.serializer.TrySchedule(func(context.Context) {
117		if ccr.resolver == nil {
118			return
119		}
120		ccr.resolver.Close()
121		ccr.resolver = nil
122	})
123	ccr.serializerCancel()
124}
125
126// UpdateState is called by resolver implementations to report new state to gRPC
127// which includes addresses and service config.
128func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
129	ccr.cc.mu.Lock()
130	ccr.mu.Lock()
131	if ccr.closed {
132		ccr.mu.Unlock()
133		ccr.cc.mu.Unlock()
134		return nil
135	}
136	if s.Endpoints == nil {
137		s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
138		for _, a := range s.Addresses {
139			ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
140			ep.Addresses[0].BalancerAttributes = nil
141			s.Endpoints = append(s.Endpoints, ep)
142		}
143	}
144	ccr.addChannelzTraceEvent(s)
145	ccr.curState = s
146	ccr.mu.Unlock()
147	return ccr.cc.updateResolverStateAndUnlock(s, nil)
148}
149
150// ReportError is called by resolver implementations to report errors
151// encountered during name resolution to gRPC.
152func (ccr *ccResolverWrapper) ReportError(err error) {
153	ccr.cc.mu.Lock()
154	ccr.mu.Lock()
155	if ccr.closed {
156		ccr.mu.Unlock()
157		ccr.cc.mu.Unlock()
158		return
159	}
160	ccr.mu.Unlock()
161	channelz.Warningf(logger, ccr.cc.channelz, "ccResolverWrapper: reporting error to cc: %v", err)
162	ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err)
163}
164
165// NewAddress is called by the resolver implementation to send addresses to
166// gRPC.
167func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
168	ccr.cc.mu.Lock()
169	ccr.mu.Lock()
170	if ccr.closed {
171		ccr.mu.Unlock()
172		ccr.cc.mu.Unlock()
173		return
174	}
175	s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}
176	ccr.addChannelzTraceEvent(s)
177	ccr.curState = s
178	ccr.mu.Unlock()
179	ccr.cc.updateResolverStateAndUnlock(s, nil)
180}
181
182// ParseServiceConfig is called by resolver implementations to parse a JSON
183// representation of the service config.
184func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
185	return parseServiceConfig(scJSON, ccr.cc.dopts.maxCallAttempts)
186}
187
188// addChannelzTraceEvent adds a channelz trace event containing the new
189// state received from resolver implementations.
190func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
191	if !logger.V(0) && !channelz.IsOn() {
192		return
193	}
194	var updates []string
195	var oldSC, newSC *ServiceConfig
196	var oldOK, newOK bool
197	if ccr.curState.ServiceConfig != nil {
198		oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
199	}
200	if s.ServiceConfig != nil {
201		newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
202	}
203	if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
204		updates = append(updates, "service config updated")
205	}
206	if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
207		updates = append(updates, "resolver returned an empty address list")
208	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
209		updates = append(updates, "resolver returned new addresses")
210	}
211	channelz.Infof(logger, ccr.cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
212}