service_config.go

  1/*
  2 *
  3 * Copyright 2017 gRPC authors.
  4 *
  5 * Licensed under the Apache License, Version 2.0 (the "License");
  6 * you may not use this file except in compliance with the License.
  7 * You may obtain a copy of the License at
  8 *
  9 *     http://www.apache.org/licenses/LICENSE-2.0
 10 *
 11 * Unless required by applicable law or agreed to in writing, software
 12 * distributed under the License is distributed on an "AS IS" BASIS,
 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14 * See the License for the specific language governing permissions and
 15 * limitations under the License.
 16 *
 17 */
 18
 19package grpc
 20
 21import (
 22	"encoding/json"
 23	"errors"
 24	"fmt"
 25	"reflect"
 26	"time"
 27
 28	"google.golang.org/grpc/balancer"
 29	"google.golang.org/grpc/balancer/pickfirst"
 30	"google.golang.org/grpc/codes"
 31	"google.golang.org/grpc/internal"
 32	"google.golang.org/grpc/internal/balancer/gracefulswitch"
 33	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
 34	"google.golang.org/grpc/serviceconfig"
 35)
 36
 37const maxInt = int(^uint(0) >> 1)
 38
 39// MethodConfig defines the configuration recommended by the service providers for a
 40// particular method.
 41//
 42// Deprecated: Users should not use this struct. Service config should be received
 43// through name resolver, as specified here
 44// https://github.com/grpc/grpc/blob/master/doc/service_config.md
 45type MethodConfig = internalserviceconfig.MethodConfig
 46
 47// ServiceConfig is provided by the service provider and contains parameters for how
 48// clients that connect to the service should behave.
 49//
 50// Deprecated: Users should not use this struct. Service config should be received
 51// through name resolver, as specified here
 52// https://github.com/grpc/grpc/blob/master/doc/service_config.md
 53type ServiceConfig struct {
 54	serviceconfig.Config
 55
 56	// lbConfig is the service config's load balancing configuration.  If
 57	// lbConfig and LB are both present, lbConfig will be used.
 58	lbConfig serviceconfig.LoadBalancingConfig
 59
 60	// Methods contains a map for the methods in this service.  If there is an
 61	// exact match for a method (i.e. /service/method) in the map, use the
 62	// corresponding MethodConfig.  If there's no exact match, look for the
 63	// default config for the service (/service/) and use the corresponding
 64	// MethodConfig if it exists.  Otherwise, the method has no MethodConfig to
 65	// use.
 66	Methods map[string]MethodConfig
 67
 68	// If a retryThrottlingPolicy is provided, gRPC will automatically throttle
 69	// retry attempts and hedged RPCs when the client’s ratio of failures to
 70	// successes exceeds a threshold.
 71	//
 72	// For each server name, the gRPC client will maintain a token_count which is
 73	// initially set to maxTokens, and can take values between 0 and maxTokens.
 74	//
 75	// Every outgoing RPC (regardless of service or method invoked) will change
 76	// token_count as follows:
 77	//
 78	//   - Every failed RPC will decrement the token_count by 1.
 79	//   - Every successful RPC will increment the token_count by tokenRatio.
 80	//
 81	// If token_count is less than or equal to maxTokens / 2, then RPCs will not
 82	// be retried and hedged RPCs will not be sent.
 83	retryThrottling *retryThrottlingPolicy
 84	// healthCheckConfig must be set as one of the requirement to enable LB channel
 85	// health check.
 86	healthCheckConfig *healthCheckConfig
 87	// rawJSONString stores service config json string that get parsed into
 88	// this service config struct.
 89	rawJSONString string
 90}
 91
 92// healthCheckConfig defines the go-native version of the LB channel health check config.
 93type healthCheckConfig struct {
 94	// serviceName is the service name to use in the health-checking request.
 95	ServiceName string
 96}
 97
 98type jsonRetryPolicy struct {
 99	MaxAttempts          int
100	InitialBackoff       internalserviceconfig.Duration
101	MaxBackoff           internalserviceconfig.Duration
102	BackoffMultiplier    float64
103	RetryableStatusCodes []codes.Code
104}
105
106// retryThrottlingPolicy defines the go-native version of the retry throttling
107// policy defined by the service config here:
108// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#integration-with-service-config
109type retryThrottlingPolicy struct {
110	// The number of tokens starts at maxTokens. The token_count will always be
111	// between 0 and maxTokens.
112	//
113	// This field is required and must be greater than zero.
114	MaxTokens float64
115	// The amount of tokens to add on each successful RPC. Typically this will
116	// be some number between 0 and 1, e.g., 0.1.
117	//
118	// This field is required and must be greater than zero. Up to 3 decimal
119	// places are supported.
120	TokenRatio float64
121}
122
123type jsonName struct {
124	Service string
125	Method  string
126}
127
128var (
129	errDuplicatedName             = errors.New("duplicated name")
130	errEmptyServiceNonEmptyMethod = errors.New("cannot combine empty 'service' and non-empty 'method'")
131)
132
133func (j jsonName) generatePath() (string, error) {
134	if j.Service == "" {
135		if j.Method != "" {
136			return "", errEmptyServiceNonEmptyMethod
137		}
138		return "", nil
139	}
140	res := "/" + j.Service + "/"
141	if j.Method != "" {
142		res += j.Method
143	}
144	return res, nil
145}
146
147// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
148type jsonMC struct {
149	Name                    *[]jsonName
150	WaitForReady            *bool
151	Timeout                 *internalserviceconfig.Duration
152	MaxRequestMessageBytes  *int64
153	MaxResponseMessageBytes *int64
154	RetryPolicy             *jsonRetryPolicy
155}
156
157// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
158type jsonSC struct {
159	LoadBalancingPolicy *string
160	LoadBalancingConfig *json.RawMessage
161	MethodConfig        *[]jsonMC
162	RetryThrottling     *retryThrottlingPolicy
163	HealthCheckConfig   *healthCheckConfig
164}
165
166func init() {
167	internal.ParseServiceConfig = func(js string) *serviceconfig.ParseResult {
168		return parseServiceConfig(js, defaultMaxCallAttempts)
169	}
170}
171
172func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult {
173	if len(js) == 0 {
174		return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")}
175	}
176	var rsc jsonSC
177	err := json.Unmarshal([]byte(js), &rsc)
178	if err != nil {
179		logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
180		return &serviceconfig.ParseResult{Err: err}
181	}
182	sc := ServiceConfig{
183		Methods:           make(map[string]MethodConfig),
184		retryThrottling:   rsc.RetryThrottling,
185		healthCheckConfig: rsc.HealthCheckConfig,
186		rawJSONString:     js,
187	}
188	c := rsc.LoadBalancingConfig
189	if c == nil {
190		name := pickfirst.Name
191		if rsc.LoadBalancingPolicy != nil {
192			name = *rsc.LoadBalancingPolicy
193		}
194		if balancer.Get(name) == nil {
195			name = pickfirst.Name
196		}
197		cfg := []map[string]any{{name: struct{}{}}}
198		strCfg, err := json.Marshal(cfg)
199		if err != nil {
200			return &serviceconfig.ParseResult{Err: fmt.Errorf("unexpected error marshaling simple LB config: %w", err)}
201		}
202		r := json.RawMessage(strCfg)
203		c = &r
204	}
205	cfg, err := gracefulswitch.ParseConfig(*c)
206	if err != nil {
207		return &serviceconfig.ParseResult{Err: err}
208	}
209	sc.lbConfig = cfg
210
211	if rsc.MethodConfig == nil {
212		return &serviceconfig.ParseResult{Config: &sc}
213	}
214
215	paths := map[string]struct{}{}
216	for _, m := range *rsc.MethodConfig {
217		if m.Name == nil {
218			continue
219		}
220
221		mc := MethodConfig{
222			WaitForReady: m.WaitForReady,
223			Timeout:      (*time.Duration)(m.Timeout),
224		}
225		if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy, maxAttempts); err != nil {
226			logger.Warningf("grpc: unmarshalling service config %s: %v", js, err)
227			return &serviceconfig.ParseResult{Err: err}
228		}
229		if m.MaxRequestMessageBytes != nil {
230			if *m.MaxRequestMessageBytes > int64(maxInt) {
231				mc.MaxReqSize = newInt(maxInt)
232			} else {
233				mc.MaxReqSize = newInt(int(*m.MaxRequestMessageBytes))
234			}
235		}
236		if m.MaxResponseMessageBytes != nil {
237			if *m.MaxResponseMessageBytes > int64(maxInt) {
238				mc.MaxRespSize = newInt(maxInt)
239			} else {
240				mc.MaxRespSize = newInt(int(*m.MaxResponseMessageBytes))
241			}
242		}
243		for i, n := range *m.Name {
244			path, err := n.generatePath()
245			if err != nil {
246				logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
247				return &serviceconfig.ParseResult{Err: err}
248			}
249
250			if _, ok := paths[path]; ok {
251				err = errDuplicatedName
252				logger.Warningf("grpc: error unmarshalling service config %s due to methodConfig[%d]: %v", js, i, err)
253				return &serviceconfig.ParseResult{Err: err}
254			}
255			paths[path] = struct{}{}
256			sc.Methods[path] = mc
257		}
258	}
259
260	if sc.retryThrottling != nil {
261		if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
262			return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)}
263		}
264		if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
265			return &serviceconfig.ParseResult{Err: fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)}
266		}
267	}
268	return &serviceconfig.ParseResult{Config: &sc}
269}
270
271func isValidRetryPolicy(jrp *jsonRetryPolicy) bool {
272	return jrp.MaxAttempts > 1 &&
273		jrp.InitialBackoff > 0 &&
274		jrp.MaxBackoff > 0 &&
275		jrp.BackoffMultiplier > 0 &&
276		len(jrp.RetryableStatusCodes) > 0
277}
278
279func convertRetryPolicy(jrp *jsonRetryPolicy, maxAttempts int) (p *internalserviceconfig.RetryPolicy, err error) {
280	if jrp == nil {
281		return nil, nil
282	}
283
284	if !isValidRetryPolicy(jrp) {
285		return nil, fmt.Errorf("invalid retry policy (%+v): ", jrp)
286	}
287
288	if jrp.MaxAttempts < maxAttempts {
289		maxAttempts = jrp.MaxAttempts
290	}
291	rp := &internalserviceconfig.RetryPolicy{
292		MaxAttempts:          maxAttempts,
293		InitialBackoff:       time.Duration(jrp.InitialBackoff),
294		MaxBackoff:           time.Duration(jrp.MaxBackoff),
295		BackoffMultiplier:    jrp.BackoffMultiplier,
296		RetryableStatusCodes: make(map[codes.Code]bool),
297	}
298	for _, code := range jrp.RetryableStatusCodes {
299		rp.RetryableStatusCodes[code] = true
300	}
301	return rp, nil
302}
303
304func minPointers(a, b *int) *int {
305	if *a < *b {
306		return a
307	}
308	return b
309}
310
311func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
312	if mcMax == nil && doptMax == nil {
313		return &defaultVal
314	}
315	if mcMax != nil && doptMax != nil {
316		return minPointers(mcMax, doptMax)
317	}
318	if mcMax != nil {
319		return mcMax
320	}
321	return doptMax
322}
323
324func newInt(b int) *int {
325	return &b
326}
327
328func init() {
329	internal.EqualServiceConfigForTesting = equalServiceConfig
330}
331
332// equalServiceConfig compares two configs. The rawJSONString field is ignored,
333// because they may diff in white spaces.
334//
335// If any of them is NOT *ServiceConfig, return false.
336func equalServiceConfig(a, b serviceconfig.Config) bool {
337	if a == nil && b == nil {
338		return true
339	}
340	aa, ok := a.(*ServiceConfig)
341	if !ok {
342		return false
343	}
344	bb, ok := b.(*ServiceConfig)
345	if !ok {
346		return false
347	}
348	aaRaw := aa.rawJSONString
349	aa.rawJSONString = ""
350	bbRaw := bb.rawJSONString
351	bb.rawJSONString = ""
352	defer func() {
353		aa.rawJSONString = aaRaw
354		bb.rawJSONString = bbRaw
355	}()
356	// Using reflect.DeepEqual instead of cmp.Equal because many balancer
357	// configs are unexported, and cmp.Equal cannot compare unexported fields
358	// from unexported structs.
359	return reflect.DeepEqual(aa, bb)
360}