1// Copyright 2011 Google Inc. All rights reserved.
2// Use of this source code is governed by the Apache 2.0
3// license that can be found in the LICENSE file.
4
5// +build !appengine
6// +build go1.7
7
8package internal
9
10import (
11 "bytes"
12 "errors"
13 "fmt"
14 "io/ioutil"
15 "log"
16 "net"
17 "net/http"
18 "net/url"
19 "os"
20 "runtime"
21 "strconv"
22 "strings"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "github.com/golang/protobuf/proto"
28 netcontext "golang.org/x/net/context"
29
30 basepb "google.golang.org/appengine/internal/base"
31 logpb "google.golang.org/appengine/internal/log"
32 remotepb "google.golang.org/appengine/internal/remote_api"
33)
34
35const (
36 apiPath = "/rpc_http"
37 defaultTicketSuffix = "/default.20150612t184001.0"
38)
39
40var (
41 // Incoming headers.
42 ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
43 dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
44 traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
45 curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
46 userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
47 remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
48
49 // Outgoing headers.
50 apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
51 apiEndpointHeaderValue = []string{"app-engine-apis"}
52 apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
53 apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
54 apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
55 apiContentType = http.CanonicalHeaderKey("Content-Type")
56 apiContentTypeValue = []string{"application/octet-stream"}
57 logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
58
59 apiHTTPClient = &http.Client{
60 Transport: &http.Transport{
61 Proxy: http.ProxyFromEnvironment,
62 Dial: limitDial,
63 },
64 }
65
66 defaultTicketOnce sync.Once
67 defaultTicket string
68 backgroundContextOnce sync.Once
69 backgroundContext netcontext.Context
70)
71
72func apiURL() *url.URL {
73 host, port := "appengine.googleapis.internal", "10001"
74 if h := os.Getenv("API_HOST"); h != "" {
75 host = h
76 }
77 if p := os.Getenv("API_PORT"); p != "" {
78 port = p
79 }
80 return &url.URL{
81 Scheme: "http",
82 Host: host + ":" + port,
83 Path: apiPath,
84 }
85}
86
87func handleHTTP(w http.ResponseWriter, r *http.Request) {
88 c := &context{
89 req: r,
90 outHeader: w.Header(),
91 apiURL: apiURL(),
92 }
93 r = r.WithContext(withContext(r.Context(), c))
94 c.req = r
95
96 stopFlushing := make(chan int)
97
98 // Patch up RemoteAddr so it looks reasonable.
99 if addr := r.Header.Get(userIPHeader); addr != "" {
100 r.RemoteAddr = addr
101 } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
102 r.RemoteAddr = addr
103 } else {
104 // Should not normally reach here, but pick a sensible default anyway.
105 r.RemoteAddr = "127.0.0.1"
106 }
107 // The address in the headers will most likely be of these forms:
108 // 123.123.123.123
109 // 2001:db8::1
110 // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
111 if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
112 // Assume the remote address is only a host; add a default port.
113 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
114 }
115
116 // Start goroutine responsible for flushing app logs.
117 // This is done after adding c to ctx.m (and stopped before removing it)
118 // because flushing logs requires making an API call.
119 go c.logFlusher(stopFlushing)
120
121 executeRequestSafely(c, r)
122 c.outHeader = nil // make sure header changes aren't respected any more
123
124 stopFlushing <- 1 // any logging beyond this point will be dropped
125
126 // Flush any pending logs asynchronously.
127 c.pendingLogs.Lock()
128 flushes := c.pendingLogs.flushes
129 if len(c.pendingLogs.lines) > 0 {
130 flushes++
131 }
132 c.pendingLogs.Unlock()
133 go c.flushLog(false)
134 w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
135
136 // Avoid nil Write call if c.Write is never called.
137 if c.outCode != 0 {
138 w.WriteHeader(c.outCode)
139 }
140 if c.outBody != nil {
141 w.Write(c.outBody)
142 }
143}
144
145func executeRequestSafely(c *context, r *http.Request) {
146 defer func() {
147 if x := recover(); x != nil {
148 logf(c, 4, "%s", renderPanic(x)) // 4 == critical
149 c.outCode = 500
150 }
151 }()
152
153 http.DefaultServeMux.ServeHTTP(c, r)
154}
155
156func renderPanic(x interface{}) string {
157 buf := make([]byte, 16<<10) // 16 KB should be plenty
158 buf = buf[:runtime.Stack(buf, false)]
159
160 // Remove the first few stack frames:
161 // this func
162 // the recover closure in the caller
163 // That will root the stack trace at the site of the panic.
164 const (
165 skipStart = "internal.renderPanic"
166 skipFrames = 2
167 )
168 start := bytes.Index(buf, []byte(skipStart))
169 p := start
170 for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
171 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
172 if p < 0 {
173 break
174 }
175 }
176 if p >= 0 {
177 // buf[start:p+1] is the block to remove.
178 // Copy buf[p+1:] over buf[start:] and shrink buf.
179 copy(buf[start:], buf[p+1:])
180 buf = buf[:len(buf)-(p+1-start)]
181 }
182
183 // Add panic heading.
184 head := fmt.Sprintf("panic: %v\n\n", x)
185 if len(head) > len(buf) {
186 // Extremely unlikely to happen.
187 return head
188 }
189 copy(buf[len(head):], buf)
190 copy(buf, head)
191
192 return string(buf)
193}
194
195// context represents the context of an in-flight HTTP request.
196// It implements the appengine.Context and http.ResponseWriter interfaces.
197type context struct {
198 req *http.Request
199
200 outCode int
201 outHeader http.Header
202 outBody []byte
203
204 pendingLogs struct {
205 sync.Mutex
206 lines []*logpb.UserAppLogLine
207 flushes int
208 }
209
210 apiURL *url.URL
211}
212
213var contextKey = "holds a *context"
214
215// jointContext joins two contexts in a superficial way.
216// It takes values and timeouts from a base context, and only values from another context.
217type jointContext struct {
218 base netcontext.Context
219 valuesOnly netcontext.Context
220}
221
222func (c jointContext) Deadline() (time.Time, bool) {
223 return c.base.Deadline()
224}
225
226func (c jointContext) Done() <-chan struct{} {
227 return c.base.Done()
228}
229
230func (c jointContext) Err() error {
231 return c.base.Err()
232}
233
234func (c jointContext) Value(key interface{}) interface{} {
235 if val := c.base.Value(key); val != nil {
236 return val
237 }
238 return c.valuesOnly.Value(key)
239}
240
241// fromContext returns the App Engine context or nil if ctx is not
242// derived from an App Engine context.
243func fromContext(ctx netcontext.Context) *context {
244 c, _ := ctx.Value(&contextKey).(*context)
245 return c
246}
247
248func withContext(parent netcontext.Context, c *context) netcontext.Context {
249 ctx := netcontext.WithValue(parent, &contextKey, c)
250 if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
251 ctx = withNamespace(ctx, ns)
252 }
253 return ctx
254}
255
256func toContext(c *context) netcontext.Context {
257 return withContext(netcontext.Background(), c)
258}
259
260func IncomingHeaders(ctx netcontext.Context) http.Header {
261 if c := fromContext(ctx); c != nil {
262 return c.req.Header
263 }
264 return nil
265}
266
267func ReqContext(req *http.Request) netcontext.Context {
268 return req.Context()
269}
270
271func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
272 return jointContext{
273 base: parent,
274 valuesOnly: req.Context(),
275 }
276}
277
278// DefaultTicket returns a ticket used for background context or dev_appserver.
279func DefaultTicket() string {
280 defaultTicketOnce.Do(func() {
281 if IsDevAppServer() {
282 defaultTicket = "testapp" + defaultTicketSuffix
283 return
284 }
285 appID := partitionlessAppID()
286 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
287 majVersion := VersionID(nil)
288 if i := strings.Index(majVersion, "."); i > 0 {
289 majVersion = majVersion[:i]
290 }
291 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
292 })
293 return defaultTicket
294}
295
296func BackgroundContext() netcontext.Context {
297 backgroundContextOnce.Do(func() {
298 // Compute background security ticket.
299 ticket := DefaultTicket()
300
301 c := &context{
302 req: &http.Request{
303 Header: http.Header{
304 ticketHeader: []string{ticket},
305 },
306 },
307 apiURL: apiURL(),
308 }
309 backgroundContext = toContext(c)
310
311 // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
312 go c.logFlusher(make(chan int))
313 })
314
315 return backgroundContext
316}
317
318// RegisterTestRequest registers the HTTP request req for testing, such that
319// any API calls are sent to the provided URL. It returns a closure to delete
320// the registration.
321// It should only be used by aetest package.
322func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
323 c := &context{
324 req: req,
325 apiURL: apiURL,
326 }
327 ctx := withContext(decorate(req.Context()), c)
328 req = req.WithContext(ctx)
329 c.req = req
330 return req, func() {}
331}
332
333var errTimeout = &CallError{
334 Detail: "Deadline exceeded",
335 Code: int32(remotepb.RpcError_CANCELLED),
336 Timeout: true,
337}
338
339func (c *context) Header() http.Header { return c.outHeader }
340
341// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
342// codes do not permit a response body (nor response entity headers such as
343// Content-Length, Content-Type, etc).
344func bodyAllowedForStatus(status int) bool {
345 switch {
346 case status >= 100 && status <= 199:
347 return false
348 case status == 204:
349 return false
350 case status == 304:
351 return false
352 }
353 return true
354}
355
356func (c *context) Write(b []byte) (int, error) {
357 if c.outCode == 0 {
358 c.WriteHeader(http.StatusOK)
359 }
360 if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
361 return 0, http.ErrBodyNotAllowed
362 }
363 c.outBody = append(c.outBody, b...)
364 return len(b), nil
365}
366
367func (c *context) WriteHeader(code int) {
368 if c.outCode != 0 {
369 logf(c, 3, "WriteHeader called multiple times on request.") // error level
370 return
371 }
372 c.outCode = code
373}
374
375func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
376 hreq := &http.Request{
377 Method: "POST",
378 URL: c.apiURL,
379 Header: http.Header{
380 apiEndpointHeader: apiEndpointHeaderValue,
381 apiMethodHeader: apiMethodHeaderValue,
382 apiContentType: apiContentTypeValue,
383 apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
384 },
385 Body: ioutil.NopCloser(bytes.NewReader(body)),
386 ContentLength: int64(len(body)),
387 Host: c.apiURL.Host,
388 }
389 if info := c.req.Header.Get(dapperHeader); info != "" {
390 hreq.Header.Set(dapperHeader, info)
391 }
392 if info := c.req.Header.Get(traceHeader); info != "" {
393 hreq.Header.Set(traceHeader, info)
394 }
395
396 tr := apiHTTPClient.Transport.(*http.Transport)
397
398 var timedOut int32 // atomic; set to 1 if timed out
399 t := time.AfterFunc(timeout, func() {
400 atomic.StoreInt32(&timedOut, 1)
401 tr.CancelRequest(hreq)
402 })
403 defer t.Stop()
404 defer func() {
405 // Check if timeout was exceeded.
406 if atomic.LoadInt32(&timedOut) != 0 {
407 err = errTimeout
408 }
409 }()
410
411 hresp, err := apiHTTPClient.Do(hreq)
412 if err != nil {
413 return nil, &CallError{
414 Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
415 Code: int32(remotepb.RpcError_UNKNOWN),
416 }
417 }
418 defer hresp.Body.Close()
419 hrespBody, err := ioutil.ReadAll(hresp.Body)
420 if hresp.StatusCode != 200 {
421 return nil, &CallError{
422 Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
423 Code: int32(remotepb.RpcError_UNKNOWN),
424 }
425 }
426 if err != nil {
427 return nil, &CallError{
428 Detail: fmt.Sprintf("service bridge response bad: %v", err),
429 Code: int32(remotepb.RpcError_UNKNOWN),
430 }
431 }
432 return hrespBody, nil
433}
434
435func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
436 if ns := NamespaceFromContext(ctx); ns != "" {
437 if fn, ok := NamespaceMods[service]; ok {
438 fn(in, ns)
439 }
440 }
441
442 if f, ctx, ok := callOverrideFromContext(ctx); ok {
443 return f(ctx, service, method, in, out)
444 }
445
446 // Handle already-done contexts quickly.
447 select {
448 case <-ctx.Done():
449 return ctx.Err()
450 default:
451 }
452
453 c := fromContext(ctx)
454 if c == nil {
455 // Give a good error message rather than a panic lower down.
456 return errNotAppEngineContext
457 }
458
459 // Apply transaction modifications if we're in a transaction.
460 if t := transactionFromContext(ctx); t != nil {
461 if t.finished {
462 return errors.New("transaction context has expired")
463 }
464 applyTransaction(in, &t.transaction)
465 }
466
467 // Default RPC timeout is 60s.
468 timeout := 60 * time.Second
469 if deadline, ok := ctx.Deadline(); ok {
470 timeout = deadline.Sub(time.Now())
471 }
472
473 data, err := proto.Marshal(in)
474 if err != nil {
475 return err
476 }
477
478 ticket := c.req.Header.Get(ticketHeader)
479 // Use a test ticket under test environment.
480 if ticket == "" {
481 if appid := ctx.Value(&appIDOverrideKey); appid != nil {
482 ticket = appid.(string) + defaultTicketSuffix
483 }
484 }
485 // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
486 if ticket == "" {
487 ticket = DefaultTicket()
488 }
489 req := &remotepb.Request{
490 ServiceName: &service,
491 Method: &method,
492 Request: data,
493 RequestId: &ticket,
494 }
495 hreqBody, err := proto.Marshal(req)
496 if err != nil {
497 return err
498 }
499
500 hrespBody, err := c.post(hreqBody, timeout)
501 if err != nil {
502 return err
503 }
504
505 res := &remotepb.Response{}
506 if err := proto.Unmarshal(hrespBody, res); err != nil {
507 return err
508 }
509 if res.RpcError != nil {
510 ce := &CallError{
511 Detail: res.RpcError.GetDetail(),
512 Code: *res.RpcError.Code,
513 }
514 switch remotepb.RpcError_ErrorCode(ce.Code) {
515 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
516 ce.Timeout = true
517 }
518 return ce
519 }
520 if res.ApplicationError != nil {
521 return &APIError{
522 Service: *req.ServiceName,
523 Detail: res.ApplicationError.GetDetail(),
524 Code: *res.ApplicationError.Code,
525 }
526 }
527 if res.Exception != nil || res.JavaException != nil {
528 // This shouldn't happen, but let's be defensive.
529 return &CallError{
530 Detail: "service bridge returned exception",
531 Code: int32(remotepb.RpcError_UNKNOWN),
532 }
533 }
534 return proto.Unmarshal(res.Response, out)
535}
536
537func (c *context) Request() *http.Request {
538 return c.req
539}
540
541func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
542 // Truncate long log lines.
543 // TODO(dsymonds): Check if this is still necessary.
544 const lim = 8 << 10
545 if len(*ll.Message) > lim {
546 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
547 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
548 }
549
550 c.pendingLogs.Lock()
551 c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
552 c.pendingLogs.Unlock()
553}
554
555var logLevelName = map[int64]string{
556 0: "DEBUG",
557 1: "INFO",
558 2: "WARNING",
559 3: "ERROR",
560 4: "CRITICAL",
561}
562
563func logf(c *context, level int64, format string, args ...interface{}) {
564 if c == nil {
565 panic("not an App Engine context")
566 }
567 s := fmt.Sprintf(format, args...)
568 s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
569 c.addLogLine(&logpb.UserAppLogLine{
570 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
571 Level: &level,
572 Message: &s,
573 })
574 log.Print(logLevelName[level] + ": " + s)
575}
576
577// flushLog attempts to flush any pending logs to the appserver.
578// It should not be called concurrently.
579func (c *context) flushLog(force bool) (flushed bool) {
580 c.pendingLogs.Lock()
581 // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
582 n, rem := 0, 30<<20
583 for ; n < len(c.pendingLogs.lines); n++ {
584 ll := c.pendingLogs.lines[n]
585 // Each log line will require about 3 bytes of overhead.
586 nb := proto.Size(ll) + 3
587 if nb > rem {
588 break
589 }
590 rem -= nb
591 }
592 lines := c.pendingLogs.lines[:n]
593 c.pendingLogs.lines = c.pendingLogs.lines[n:]
594 c.pendingLogs.Unlock()
595
596 if len(lines) == 0 && !force {
597 // Nothing to flush.
598 return false
599 }
600
601 rescueLogs := false
602 defer func() {
603 if rescueLogs {
604 c.pendingLogs.Lock()
605 c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
606 c.pendingLogs.Unlock()
607 }
608 }()
609
610 buf, err := proto.Marshal(&logpb.UserAppLogGroup{
611 LogLine: lines,
612 })
613 if err != nil {
614 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
615 rescueLogs = true
616 return false
617 }
618
619 req := &logpb.FlushRequest{
620 Logs: buf,
621 }
622 res := &basepb.VoidProto{}
623 c.pendingLogs.Lock()
624 c.pendingLogs.flushes++
625 c.pendingLogs.Unlock()
626 if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
627 log.Printf("internal.flushLog: Flush RPC: %v", err)
628 rescueLogs = true
629 return false
630 }
631 return true
632}
633
634const (
635 // Log flushing parameters.
636 flushInterval = 1 * time.Second
637 forceFlushInterval = 60 * time.Second
638)
639
640func (c *context) logFlusher(stop <-chan int) {
641 lastFlush := time.Now()
642 tick := time.NewTicker(flushInterval)
643 for {
644 select {
645 case <-stop:
646 // Request finished.
647 tick.Stop()
648 return
649 case <-tick.C:
650 force := time.Now().Sub(lastFlush) > forceFlushInterval
651 if c.flushLog(force) {
652 lastFlush = time.Now()
653 }
654 }
655 }
656}
657
658func ContextForTesting(req *http.Request) netcontext.Context {
659 return toContext(&context{req: req})
660}