api_pre17.go

  1// Copyright 2011 Google Inc. All rights reserved.
  2// Use of this source code is governed by the Apache 2.0
  3// license that can be found in the LICENSE file.
  4
  5// +build !appengine
  6// +build !go1.7
  7
  8package internal
  9
 10import (
 11	"bytes"
 12	"errors"
 13	"fmt"
 14	"io/ioutil"
 15	"log"
 16	"net"
 17	"net/http"
 18	"net/url"
 19	"os"
 20	"runtime"
 21	"strconv"
 22	"strings"
 23	"sync"
 24	"sync/atomic"
 25	"time"
 26
 27	"github.com/golang/protobuf/proto"
 28	netcontext "golang.org/x/net/context"
 29
 30	basepb "google.golang.org/appengine/internal/base"
 31	logpb "google.golang.org/appengine/internal/log"
 32	remotepb "google.golang.org/appengine/internal/remote_api"
 33)
 34
 35const (
 36	apiPath             = "/rpc_http"
 37	defaultTicketSuffix = "/default.20150612t184001.0"
 38)
 39
 40var (
 41	// Incoming headers.
 42	ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
 43	dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
 44	traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
 45	curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
 46	userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP")
 47	remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
 48
 49	// Outgoing headers.
 50	apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
 51	apiEndpointHeaderValue = []string{"app-engine-apis"}
 52	apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
 53	apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"}
 54	apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
 55	apiContentType         = http.CanonicalHeaderKey("Content-Type")
 56	apiContentTypeValue    = []string{"application/octet-stream"}
 57	logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
 58
 59	apiHTTPClient = &http.Client{
 60		Transport: &http.Transport{
 61			Proxy: http.ProxyFromEnvironment,
 62			Dial:  limitDial,
 63		},
 64	}
 65
 66	defaultTicketOnce sync.Once
 67	defaultTicket     string
 68)
 69
 70func apiURL() *url.URL {
 71	host, port := "appengine.googleapis.internal", "10001"
 72	if h := os.Getenv("API_HOST"); h != "" {
 73		host = h
 74	}
 75	if p := os.Getenv("API_PORT"); p != "" {
 76		port = p
 77	}
 78	return &url.URL{
 79		Scheme: "http",
 80		Host:   host + ":" + port,
 81		Path:   apiPath,
 82	}
 83}
 84
 85func handleHTTP(w http.ResponseWriter, r *http.Request) {
 86	c := &context{
 87		req:       r,
 88		outHeader: w.Header(),
 89		apiURL:    apiURL(),
 90	}
 91	stopFlushing := make(chan int)
 92
 93	ctxs.Lock()
 94	ctxs.m[r] = c
 95	ctxs.Unlock()
 96	defer func() {
 97		ctxs.Lock()
 98		delete(ctxs.m, r)
 99		ctxs.Unlock()
100	}()
101
102	// Patch up RemoteAddr so it looks reasonable.
103	if addr := r.Header.Get(userIPHeader); addr != "" {
104		r.RemoteAddr = addr
105	} else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
106		r.RemoteAddr = addr
107	} else {
108		// Should not normally reach here, but pick a sensible default anyway.
109		r.RemoteAddr = "127.0.0.1"
110	}
111	// The address in the headers will most likely be of these forms:
112	//	123.123.123.123
113	//	2001:db8::1
114	// net/http.Request.RemoteAddr is specified to be in "IP:port" form.
115	if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
116		// Assume the remote address is only a host; add a default port.
117		r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
118	}
119
120	// Start goroutine responsible for flushing app logs.
121	// This is done after adding c to ctx.m (and stopped before removing it)
122	// because flushing logs requires making an API call.
123	go c.logFlusher(stopFlushing)
124
125	executeRequestSafely(c, r)
126	c.outHeader = nil // make sure header changes aren't respected any more
127
128	stopFlushing <- 1 // any logging beyond this point will be dropped
129
130	// Flush any pending logs asynchronously.
131	c.pendingLogs.Lock()
132	flushes := c.pendingLogs.flushes
133	if len(c.pendingLogs.lines) > 0 {
134		flushes++
135	}
136	c.pendingLogs.Unlock()
137	go c.flushLog(false)
138	w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
139
140	// Avoid nil Write call if c.Write is never called.
141	if c.outCode != 0 {
142		w.WriteHeader(c.outCode)
143	}
144	if c.outBody != nil {
145		w.Write(c.outBody)
146	}
147}
148
149func executeRequestSafely(c *context, r *http.Request) {
150	defer func() {
151		if x := recover(); x != nil {
152			logf(c, 4, "%s", renderPanic(x)) // 4 == critical
153			c.outCode = 500
154		}
155	}()
156
157	http.DefaultServeMux.ServeHTTP(c, r)
158}
159
160func renderPanic(x interface{}) string {
161	buf := make([]byte, 16<<10) // 16 KB should be plenty
162	buf = buf[:runtime.Stack(buf, false)]
163
164	// Remove the first few stack frames:
165	//   this func
166	//   the recover closure in the caller
167	// That will root the stack trace at the site of the panic.
168	const (
169		skipStart  = "internal.renderPanic"
170		skipFrames = 2
171	)
172	start := bytes.Index(buf, []byte(skipStart))
173	p := start
174	for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
175		p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
176		if p < 0 {
177			break
178		}
179	}
180	if p >= 0 {
181		// buf[start:p+1] is the block to remove.
182		// Copy buf[p+1:] over buf[start:] and shrink buf.
183		copy(buf[start:], buf[p+1:])
184		buf = buf[:len(buf)-(p+1-start)]
185	}
186
187	// Add panic heading.
188	head := fmt.Sprintf("panic: %v\n\n", x)
189	if len(head) > len(buf) {
190		// Extremely unlikely to happen.
191		return head
192	}
193	copy(buf[len(head):], buf)
194	copy(buf, head)
195
196	return string(buf)
197}
198
199var ctxs = struct {
200	sync.Mutex
201	m  map[*http.Request]*context
202	bg *context // background context, lazily initialized
203	// dec is used by tests to decorate the netcontext.Context returned
204	// for a given request. This allows tests to add overrides (such as
205	// WithAppIDOverride) to the context. The map is nil outside tests.
206	dec map[*http.Request]func(netcontext.Context) netcontext.Context
207}{
208	m: make(map[*http.Request]*context),
209}
210
211// context represents the context of an in-flight HTTP request.
212// It implements the appengine.Context and http.ResponseWriter interfaces.
213type context struct {
214	req *http.Request
215
216	outCode   int
217	outHeader http.Header
218	outBody   []byte
219
220	pendingLogs struct {
221		sync.Mutex
222		lines   []*logpb.UserAppLogLine
223		flushes int
224	}
225
226	apiURL *url.URL
227}
228
229var contextKey = "holds a *context"
230
231// fromContext returns the App Engine context or nil if ctx is not
232// derived from an App Engine context.
233func fromContext(ctx netcontext.Context) *context {
234	c, _ := ctx.Value(&contextKey).(*context)
235	return c
236}
237
238func withContext(parent netcontext.Context, c *context) netcontext.Context {
239	ctx := netcontext.WithValue(parent, &contextKey, c)
240	if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
241		ctx = withNamespace(ctx, ns)
242	}
243	return ctx
244}
245
246func toContext(c *context) netcontext.Context {
247	return withContext(netcontext.Background(), c)
248}
249
250func IncomingHeaders(ctx netcontext.Context) http.Header {
251	if c := fromContext(ctx); c != nil {
252		return c.req.Header
253	}
254	return nil
255}
256
257func ReqContext(req *http.Request) netcontext.Context {
258	return WithContext(netcontext.Background(), req)
259}
260
261func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
262	ctxs.Lock()
263	c := ctxs.m[req]
264	d := ctxs.dec[req]
265	ctxs.Unlock()
266
267	if d != nil {
268		parent = d(parent)
269	}
270
271	if c == nil {
272		// Someone passed in an http.Request that is not in-flight.
273		// We panic here rather than panicking at a later point
274		// so that stack traces will be more sensible.
275		log.Panic("appengine: NewContext passed an unknown http.Request")
276	}
277	return withContext(parent, c)
278}
279
280// DefaultTicket returns a ticket used for background context or dev_appserver.
281func DefaultTicket() string {
282	defaultTicketOnce.Do(func() {
283		if IsDevAppServer() {
284			defaultTicket = "testapp" + defaultTicketSuffix
285			return
286		}
287		appID := partitionlessAppID()
288		escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
289		majVersion := VersionID(nil)
290		if i := strings.Index(majVersion, "."); i > 0 {
291			majVersion = majVersion[:i]
292		}
293		defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
294	})
295	return defaultTicket
296}
297
298func BackgroundContext() netcontext.Context {
299	ctxs.Lock()
300	defer ctxs.Unlock()
301
302	if ctxs.bg != nil {
303		return toContext(ctxs.bg)
304	}
305
306	// Compute background security ticket.
307	ticket := DefaultTicket()
308
309	ctxs.bg = &context{
310		req: &http.Request{
311			Header: http.Header{
312				ticketHeader: []string{ticket},
313			},
314		},
315		apiURL: apiURL(),
316	}
317
318	// TODO(dsymonds): Wire up the shutdown handler to do a final flush.
319	go ctxs.bg.logFlusher(make(chan int))
320
321	return toContext(ctxs.bg)
322}
323
324// RegisterTestRequest registers the HTTP request req for testing, such that
325// any API calls are sent to the provided URL. It returns a closure to delete
326// the registration.
327// It should only be used by aetest package.
328func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
329	c := &context{
330		req:    req,
331		apiURL: apiURL,
332	}
333	ctxs.Lock()
334	defer ctxs.Unlock()
335	if _, ok := ctxs.m[req]; ok {
336		log.Panic("req already associated with context")
337	}
338	if _, ok := ctxs.dec[req]; ok {
339		log.Panic("req already associated with context")
340	}
341	if ctxs.dec == nil {
342		ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context)
343	}
344	ctxs.m[req] = c
345	ctxs.dec[req] = decorate
346
347	return req, func() {
348		ctxs.Lock()
349		delete(ctxs.m, req)
350		delete(ctxs.dec, req)
351		ctxs.Unlock()
352	}
353}
354
355var errTimeout = &CallError{
356	Detail:  "Deadline exceeded",
357	Code:    int32(remotepb.RpcError_CANCELLED),
358	Timeout: true,
359}
360
361func (c *context) Header() http.Header { return c.outHeader }
362
363// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
364// codes do not permit a response body (nor response entity headers such as
365// Content-Length, Content-Type, etc).
366func bodyAllowedForStatus(status int) bool {
367	switch {
368	case status >= 100 && status <= 199:
369		return false
370	case status == 204:
371		return false
372	case status == 304:
373		return false
374	}
375	return true
376}
377
378func (c *context) Write(b []byte) (int, error) {
379	if c.outCode == 0 {
380		c.WriteHeader(http.StatusOK)
381	}
382	if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
383		return 0, http.ErrBodyNotAllowed
384	}
385	c.outBody = append(c.outBody, b...)
386	return len(b), nil
387}
388
389func (c *context) WriteHeader(code int) {
390	if c.outCode != 0 {
391		logf(c, 3, "WriteHeader called multiple times on request.") // error level
392		return
393	}
394	c.outCode = code
395}
396
397func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
398	hreq := &http.Request{
399		Method: "POST",
400		URL:    c.apiURL,
401		Header: http.Header{
402			apiEndpointHeader: apiEndpointHeaderValue,
403			apiMethodHeader:   apiMethodHeaderValue,
404			apiContentType:    apiContentTypeValue,
405			apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
406		},
407		Body:          ioutil.NopCloser(bytes.NewReader(body)),
408		ContentLength: int64(len(body)),
409		Host:          c.apiURL.Host,
410	}
411	if info := c.req.Header.Get(dapperHeader); info != "" {
412		hreq.Header.Set(dapperHeader, info)
413	}
414	if info := c.req.Header.Get(traceHeader); info != "" {
415		hreq.Header.Set(traceHeader, info)
416	}
417
418	tr := apiHTTPClient.Transport.(*http.Transport)
419
420	var timedOut int32 // atomic; set to 1 if timed out
421	t := time.AfterFunc(timeout, func() {
422		atomic.StoreInt32(&timedOut, 1)
423		tr.CancelRequest(hreq)
424	})
425	defer t.Stop()
426	defer func() {
427		// Check if timeout was exceeded.
428		if atomic.LoadInt32(&timedOut) != 0 {
429			err = errTimeout
430		}
431	}()
432
433	hresp, err := apiHTTPClient.Do(hreq)
434	if err != nil {
435		return nil, &CallError{
436			Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
437			Code:   int32(remotepb.RpcError_UNKNOWN),
438		}
439	}
440	defer hresp.Body.Close()
441	hrespBody, err := ioutil.ReadAll(hresp.Body)
442	if hresp.StatusCode != 200 {
443		return nil, &CallError{
444			Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
445			Code:   int32(remotepb.RpcError_UNKNOWN),
446		}
447	}
448	if err != nil {
449		return nil, &CallError{
450			Detail: fmt.Sprintf("service bridge response bad: %v", err),
451			Code:   int32(remotepb.RpcError_UNKNOWN),
452		}
453	}
454	return hrespBody, nil
455}
456
457func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
458	if ns := NamespaceFromContext(ctx); ns != "" {
459		if fn, ok := NamespaceMods[service]; ok {
460			fn(in, ns)
461		}
462	}
463
464	if f, ctx, ok := callOverrideFromContext(ctx); ok {
465		return f(ctx, service, method, in, out)
466	}
467
468	// Handle already-done contexts quickly.
469	select {
470	case <-ctx.Done():
471		return ctx.Err()
472	default:
473	}
474
475	c := fromContext(ctx)
476	if c == nil {
477		// Give a good error message rather than a panic lower down.
478		return errNotAppEngineContext
479	}
480
481	// Apply transaction modifications if we're in a transaction.
482	if t := transactionFromContext(ctx); t != nil {
483		if t.finished {
484			return errors.New("transaction context has expired")
485		}
486		applyTransaction(in, &t.transaction)
487	}
488
489	// Default RPC timeout is 60s.
490	timeout := 60 * time.Second
491	if deadline, ok := ctx.Deadline(); ok {
492		timeout = deadline.Sub(time.Now())
493	}
494
495	data, err := proto.Marshal(in)
496	if err != nil {
497		return err
498	}
499
500	ticket := c.req.Header.Get(ticketHeader)
501	// Use a test ticket under test environment.
502	if ticket == "" {
503		if appid := ctx.Value(&appIDOverrideKey); appid != nil {
504			ticket = appid.(string) + defaultTicketSuffix
505		}
506	}
507	// Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
508	if ticket == "" {
509		ticket = DefaultTicket()
510	}
511	req := &remotepb.Request{
512		ServiceName: &service,
513		Method:      &method,
514		Request:     data,
515		RequestId:   &ticket,
516	}
517	hreqBody, err := proto.Marshal(req)
518	if err != nil {
519		return err
520	}
521
522	hrespBody, err := c.post(hreqBody, timeout)
523	if err != nil {
524		return err
525	}
526
527	res := &remotepb.Response{}
528	if err := proto.Unmarshal(hrespBody, res); err != nil {
529		return err
530	}
531	if res.RpcError != nil {
532		ce := &CallError{
533			Detail: res.RpcError.GetDetail(),
534			Code:   *res.RpcError.Code,
535		}
536		switch remotepb.RpcError_ErrorCode(ce.Code) {
537		case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
538			ce.Timeout = true
539		}
540		return ce
541	}
542	if res.ApplicationError != nil {
543		return &APIError{
544			Service: *req.ServiceName,
545			Detail:  res.ApplicationError.GetDetail(),
546			Code:    *res.ApplicationError.Code,
547		}
548	}
549	if res.Exception != nil || res.JavaException != nil {
550		// This shouldn't happen, but let's be defensive.
551		return &CallError{
552			Detail: "service bridge returned exception",
553			Code:   int32(remotepb.RpcError_UNKNOWN),
554		}
555	}
556	return proto.Unmarshal(res.Response, out)
557}
558
559func (c *context) Request() *http.Request {
560	return c.req
561}
562
563func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
564	// Truncate long log lines.
565	// TODO(dsymonds): Check if this is still necessary.
566	const lim = 8 << 10
567	if len(*ll.Message) > lim {
568		suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
569		ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
570	}
571
572	c.pendingLogs.Lock()
573	c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
574	c.pendingLogs.Unlock()
575}
576
577var logLevelName = map[int64]string{
578	0: "DEBUG",
579	1: "INFO",
580	2: "WARNING",
581	3: "ERROR",
582	4: "CRITICAL",
583}
584
585func logf(c *context, level int64, format string, args ...interface{}) {
586	if c == nil {
587		panic("not an App Engine context")
588	}
589	s := fmt.Sprintf(format, args...)
590	s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
591	c.addLogLine(&logpb.UserAppLogLine{
592		TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
593		Level:         &level,
594		Message:       &s,
595	})
596	log.Print(logLevelName[level] + ": " + s)
597}
598
599// flushLog attempts to flush any pending logs to the appserver.
600// It should not be called concurrently.
601func (c *context) flushLog(force bool) (flushed bool) {
602	c.pendingLogs.Lock()
603	// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
604	n, rem := 0, 30<<20
605	for ; n < len(c.pendingLogs.lines); n++ {
606		ll := c.pendingLogs.lines[n]
607		// Each log line will require about 3 bytes of overhead.
608		nb := proto.Size(ll) + 3
609		if nb > rem {
610			break
611		}
612		rem -= nb
613	}
614	lines := c.pendingLogs.lines[:n]
615	c.pendingLogs.lines = c.pendingLogs.lines[n:]
616	c.pendingLogs.Unlock()
617
618	if len(lines) == 0 && !force {
619		// Nothing to flush.
620		return false
621	}
622
623	rescueLogs := false
624	defer func() {
625		if rescueLogs {
626			c.pendingLogs.Lock()
627			c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
628			c.pendingLogs.Unlock()
629		}
630	}()
631
632	buf, err := proto.Marshal(&logpb.UserAppLogGroup{
633		LogLine: lines,
634	})
635	if err != nil {
636		log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
637		rescueLogs = true
638		return false
639	}
640
641	req := &logpb.FlushRequest{
642		Logs: buf,
643	}
644	res := &basepb.VoidProto{}
645	c.pendingLogs.Lock()
646	c.pendingLogs.flushes++
647	c.pendingLogs.Unlock()
648	if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
649		log.Printf("internal.flushLog: Flush RPC: %v", err)
650		rescueLogs = true
651		return false
652	}
653	return true
654}
655
656const (
657	// Log flushing parameters.
658	flushInterval      = 1 * time.Second
659	forceFlushInterval = 60 * time.Second
660)
661
662func (c *context) logFlusher(stop <-chan int) {
663	lastFlush := time.Now()
664	tick := time.NewTicker(flushInterval)
665	for {
666		select {
667		case <-stop:
668			// Request finished.
669			tick.Stop()
670			return
671		case <-tick.C:
672			force := time.Now().Sub(lastFlush) > forceFlushInterval
673			if c.flushLog(force) {
674				lastFlush = time.Now()
675			}
676		}
677	}
678}
679
680func ContextForTesting(req *http.Request) netcontext.Context {
681	return toContext(&context{req: req})
682}