1// Copyright 2011 Google Inc. All rights reserved.
2// Use of this source code is governed by the Apache 2.0
3// license that can be found in the LICENSE file.
4
5// +build !appengine
6// +build !go1.7
7
8package internal
9
10import (
11 "bytes"
12 "errors"
13 "fmt"
14 "io/ioutil"
15 "log"
16 "net"
17 "net/http"
18 "net/url"
19 "os"
20 "runtime"
21 "strconv"
22 "strings"
23 "sync"
24 "sync/atomic"
25 "time"
26
27 "github.com/golang/protobuf/proto"
28 netcontext "golang.org/x/net/context"
29
30 basepb "google.golang.org/appengine/internal/base"
31 logpb "google.golang.org/appengine/internal/log"
32 remotepb "google.golang.org/appengine/internal/remote_api"
33)
34
35const (
36 apiPath = "/rpc_http"
37 defaultTicketSuffix = "/default.20150612t184001.0"
38)
39
40var (
41 // Incoming headers.
42 ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
43 dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
44 traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
45 curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
46 userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
47 remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
48
49 // Outgoing headers.
50 apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
51 apiEndpointHeaderValue = []string{"app-engine-apis"}
52 apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
53 apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
54 apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
55 apiContentType = http.CanonicalHeaderKey("Content-Type")
56 apiContentTypeValue = []string{"application/octet-stream"}
57 logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
58
59 apiHTTPClient = &http.Client{
60 Transport: &http.Transport{
61 Proxy: http.ProxyFromEnvironment,
62 Dial: limitDial,
63 },
64 }
65
66 defaultTicketOnce sync.Once
67 defaultTicket string
68)
69
70func apiURL() *url.URL {
71 host, port := "appengine.googleapis.internal", "10001"
72 if h := os.Getenv("API_HOST"); h != "" {
73 host = h
74 }
75 if p := os.Getenv("API_PORT"); p != "" {
76 port = p
77 }
78 return &url.URL{
79 Scheme: "http",
80 Host: host + ":" + port,
81 Path: apiPath,
82 }
83}
84
85func handleHTTP(w http.ResponseWriter, r *http.Request) {
86 c := &context{
87 req: r,
88 outHeader: w.Header(),
89 apiURL: apiURL(),
90 }
91 stopFlushing := make(chan int)
92
93 ctxs.Lock()
94 ctxs.m[r] = c
95 ctxs.Unlock()
96 defer func() {
97 ctxs.Lock()
98 delete(ctxs.m, r)
99 ctxs.Unlock()
100 }()
101
102 // Patch up RemoteAddr so it looks reasonable.
103 if addr := r.Header.Get(userIPHeader); addr != "" {
104 r.RemoteAddr = addr
105 } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
106 r.RemoteAddr = addr
107 } else {
108 // Should not normally reach here, but pick a sensible default anyway.
109 r.RemoteAddr = "127.0.0.1"
110 }
111 // The address in the headers will most likely be of these forms:
112 // 123.123.123.123
113 // 2001:db8::1
114 // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
115 if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
116 // Assume the remote address is only a host; add a default port.
117 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
118 }
119
120 // Start goroutine responsible for flushing app logs.
121 // This is done after adding c to ctx.m (and stopped before removing it)
122 // because flushing logs requires making an API call.
123 go c.logFlusher(stopFlushing)
124
125 executeRequestSafely(c, r)
126 c.outHeader = nil // make sure header changes aren't respected any more
127
128 stopFlushing <- 1 // any logging beyond this point will be dropped
129
130 // Flush any pending logs asynchronously.
131 c.pendingLogs.Lock()
132 flushes := c.pendingLogs.flushes
133 if len(c.pendingLogs.lines) > 0 {
134 flushes++
135 }
136 c.pendingLogs.Unlock()
137 go c.flushLog(false)
138 w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
139
140 // Avoid nil Write call if c.Write is never called.
141 if c.outCode != 0 {
142 w.WriteHeader(c.outCode)
143 }
144 if c.outBody != nil {
145 w.Write(c.outBody)
146 }
147}
148
149func executeRequestSafely(c *context, r *http.Request) {
150 defer func() {
151 if x := recover(); x != nil {
152 logf(c, 4, "%s", renderPanic(x)) // 4 == critical
153 c.outCode = 500
154 }
155 }()
156
157 http.DefaultServeMux.ServeHTTP(c, r)
158}
159
160func renderPanic(x interface{}) string {
161 buf := make([]byte, 16<<10) // 16 KB should be plenty
162 buf = buf[:runtime.Stack(buf, false)]
163
164 // Remove the first few stack frames:
165 // this func
166 // the recover closure in the caller
167 // That will root the stack trace at the site of the panic.
168 const (
169 skipStart = "internal.renderPanic"
170 skipFrames = 2
171 )
172 start := bytes.Index(buf, []byte(skipStart))
173 p := start
174 for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
175 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
176 if p < 0 {
177 break
178 }
179 }
180 if p >= 0 {
181 // buf[start:p+1] is the block to remove.
182 // Copy buf[p+1:] over buf[start:] and shrink buf.
183 copy(buf[start:], buf[p+1:])
184 buf = buf[:len(buf)-(p+1-start)]
185 }
186
187 // Add panic heading.
188 head := fmt.Sprintf("panic: %v\n\n", x)
189 if len(head) > len(buf) {
190 // Extremely unlikely to happen.
191 return head
192 }
193 copy(buf[len(head):], buf)
194 copy(buf, head)
195
196 return string(buf)
197}
198
199var ctxs = struct {
200 sync.Mutex
201 m map[*http.Request]*context
202 bg *context // background context, lazily initialized
203 // dec is used by tests to decorate the netcontext.Context returned
204 // for a given request. This allows tests to add overrides (such as
205 // WithAppIDOverride) to the context. The map is nil outside tests.
206 dec map[*http.Request]func(netcontext.Context) netcontext.Context
207}{
208 m: make(map[*http.Request]*context),
209}
210
211// context represents the context of an in-flight HTTP request.
212// It implements the appengine.Context and http.ResponseWriter interfaces.
213type context struct {
214 req *http.Request
215
216 outCode int
217 outHeader http.Header
218 outBody []byte
219
220 pendingLogs struct {
221 sync.Mutex
222 lines []*logpb.UserAppLogLine
223 flushes int
224 }
225
226 apiURL *url.URL
227}
228
229var contextKey = "holds a *context"
230
231// fromContext returns the App Engine context or nil if ctx is not
232// derived from an App Engine context.
233func fromContext(ctx netcontext.Context) *context {
234 c, _ := ctx.Value(&contextKey).(*context)
235 return c
236}
237
238func withContext(parent netcontext.Context, c *context) netcontext.Context {
239 ctx := netcontext.WithValue(parent, &contextKey, c)
240 if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
241 ctx = withNamespace(ctx, ns)
242 }
243 return ctx
244}
245
246func toContext(c *context) netcontext.Context {
247 return withContext(netcontext.Background(), c)
248}
249
250func IncomingHeaders(ctx netcontext.Context) http.Header {
251 if c := fromContext(ctx); c != nil {
252 return c.req.Header
253 }
254 return nil
255}
256
257func ReqContext(req *http.Request) netcontext.Context {
258 return WithContext(netcontext.Background(), req)
259}
260
261func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
262 ctxs.Lock()
263 c := ctxs.m[req]
264 d := ctxs.dec[req]
265 ctxs.Unlock()
266
267 if d != nil {
268 parent = d(parent)
269 }
270
271 if c == nil {
272 // Someone passed in an http.Request that is not in-flight.
273 // We panic here rather than panicking at a later point
274 // so that stack traces will be more sensible.
275 log.Panic("appengine: NewContext passed an unknown http.Request")
276 }
277 return withContext(parent, c)
278}
279
280// DefaultTicket returns a ticket used for background context or dev_appserver.
281func DefaultTicket() string {
282 defaultTicketOnce.Do(func() {
283 if IsDevAppServer() {
284 defaultTicket = "testapp" + defaultTicketSuffix
285 return
286 }
287 appID := partitionlessAppID()
288 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
289 majVersion := VersionID(nil)
290 if i := strings.Index(majVersion, "."); i > 0 {
291 majVersion = majVersion[:i]
292 }
293 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
294 })
295 return defaultTicket
296}
297
298func BackgroundContext() netcontext.Context {
299 ctxs.Lock()
300 defer ctxs.Unlock()
301
302 if ctxs.bg != nil {
303 return toContext(ctxs.bg)
304 }
305
306 // Compute background security ticket.
307 ticket := DefaultTicket()
308
309 ctxs.bg = &context{
310 req: &http.Request{
311 Header: http.Header{
312 ticketHeader: []string{ticket},
313 },
314 },
315 apiURL: apiURL(),
316 }
317
318 // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
319 go ctxs.bg.logFlusher(make(chan int))
320
321 return toContext(ctxs.bg)
322}
323
324// RegisterTestRequest registers the HTTP request req for testing, such that
325// any API calls are sent to the provided URL. It returns a closure to delete
326// the registration.
327// It should only be used by aetest package.
328func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
329 c := &context{
330 req: req,
331 apiURL: apiURL,
332 }
333 ctxs.Lock()
334 defer ctxs.Unlock()
335 if _, ok := ctxs.m[req]; ok {
336 log.Panic("req already associated with context")
337 }
338 if _, ok := ctxs.dec[req]; ok {
339 log.Panic("req already associated with context")
340 }
341 if ctxs.dec == nil {
342 ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context)
343 }
344 ctxs.m[req] = c
345 ctxs.dec[req] = decorate
346
347 return req, func() {
348 ctxs.Lock()
349 delete(ctxs.m, req)
350 delete(ctxs.dec, req)
351 ctxs.Unlock()
352 }
353}
354
355var errTimeout = &CallError{
356 Detail: "Deadline exceeded",
357 Code: int32(remotepb.RpcError_CANCELLED),
358 Timeout: true,
359}
360
361func (c *context) Header() http.Header { return c.outHeader }
362
363// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
364// codes do not permit a response body (nor response entity headers such as
365// Content-Length, Content-Type, etc).
366func bodyAllowedForStatus(status int) bool {
367 switch {
368 case status >= 100 && status <= 199:
369 return false
370 case status == 204:
371 return false
372 case status == 304:
373 return false
374 }
375 return true
376}
377
378func (c *context) Write(b []byte) (int, error) {
379 if c.outCode == 0 {
380 c.WriteHeader(http.StatusOK)
381 }
382 if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
383 return 0, http.ErrBodyNotAllowed
384 }
385 c.outBody = append(c.outBody, b...)
386 return len(b), nil
387}
388
389func (c *context) WriteHeader(code int) {
390 if c.outCode != 0 {
391 logf(c, 3, "WriteHeader called multiple times on request.") // error level
392 return
393 }
394 c.outCode = code
395}
396
397func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
398 hreq := &http.Request{
399 Method: "POST",
400 URL: c.apiURL,
401 Header: http.Header{
402 apiEndpointHeader: apiEndpointHeaderValue,
403 apiMethodHeader: apiMethodHeaderValue,
404 apiContentType: apiContentTypeValue,
405 apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
406 },
407 Body: ioutil.NopCloser(bytes.NewReader(body)),
408 ContentLength: int64(len(body)),
409 Host: c.apiURL.Host,
410 }
411 if info := c.req.Header.Get(dapperHeader); info != "" {
412 hreq.Header.Set(dapperHeader, info)
413 }
414 if info := c.req.Header.Get(traceHeader); info != "" {
415 hreq.Header.Set(traceHeader, info)
416 }
417
418 tr := apiHTTPClient.Transport.(*http.Transport)
419
420 var timedOut int32 // atomic; set to 1 if timed out
421 t := time.AfterFunc(timeout, func() {
422 atomic.StoreInt32(&timedOut, 1)
423 tr.CancelRequest(hreq)
424 })
425 defer t.Stop()
426 defer func() {
427 // Check if timeout was exceeded.
428 if atomic.LoadInt32(&timedOut) != 0 {
429 err = errTimeout
430 }
431 }()
432
433 hresp, err := apiHTTPClient.Do(hreq)
434 if err != nil {
435 return nil, &CallError{
436 Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
437 Code: int32(remotepb.RpcError_UNKNOWN),
438 }
439 }
440 defer hresp.Body.Close()
441 hrespBody, err := ioutil.ReadAll(hresp.Body)
442 if hresp.StatusCode != 200 {
443 return nil, &CallError{
444 Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
445 Code: int32(remotepb.RpcError_UNKNOWN),
446 }
447 }
448 if err != nil {
449 return nil, &CallError{
450 Detail: fmt.Sprintf("service bridge response bad: %v", err),
451 Code: int32(remotepb.RpcError_UNKNOWN),
452 }
453 }
454 return hrespBody, nil
455}
456
457func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
458 if ns := NamespaceFromContext(ctx); ns != "" {
459 if fn, ok := NamespaceMods[service]; ok {
460 fn(in, ns)
461 }
462 }
463
464 if f, ctx, ok := callOverrideFromContext(ctx); ok {
465 return f(ctx, service, method, in, out)
466 }
467
468 // Handle already-done contexts quickly.
469 select {
470 case <-ctx.Done():
471 return ctx.Err()
472 default:
473 }
474
475 c := fromContext(ctx)
476 if c == nil {
477 // Give a good error message rather than a panic lower down.
478 return errNotAppEngineContext
479 }
480
481 // Apply transaction modifications if we're in a transaction.
482 if t := transactionFromContext(ctx); t != nil {
483 if t.finished {
484 return errors.New("transaction context has expired")
485 }
486 applyTransaction(in, &t.transaction)
487 }
488
489 // Default RPC timeout is 60s.
490 timeout := 60 * time.Second
491 if deadline, ok := ctx.Deadline(); ok {
492 timeout = deadline.Sub(time.Now())
493 }
494
495 data, err := proto.Marshal(in)
496 if err != nil {
497 return err
498 }
499
500 ticket := c.req.Header.Get(ticketHeader)
501 // Use a test ticket under test environment.
502 if ticket == "" {
503 if appid := ctx.Value(&appIDOverrideKey); appid != nil {
504 ticket = appid.(string) + defaultTicketSuffix
505 }
506 }
507 // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
508 if ticket == "" {
509 ticket = DefaultTicket()
510 }
511 req := &remotepb.Request{
512 ServiceName: &service,
513 Method: &method,
514 Request: data,
515 RequestId: &ticket,
516 }
517 hreqBody, err := proto.Marshal(req)
518 if err != nil {
519 return err
520 }
521
522 hrespBody, err := c.post(hreqBody, timeout)
523 if err != nil {
524 return err
525 }
526
527 res := &remotepb.Response{}
528 if err := proto.Unmarshal(hrespBody, res); err != nil {
529 return err
530 }
531 if res.RpcError != nil {
532 ce := &CallError{
533 Detail: res.RpcError.GetDetail(),
534 Code: *res.RpcError.Code,
535 }
536 switch remotepb.RpcError_ErrorCode(ce.Code) {
537 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
538 ce.Timeout = true
539 }
540 return ce
541 }
542 if res.ApplicationError != nil {
543 return &APIError{
544 Service: *req.ServiceName,
545 Detail: res.ApplicationError.GetDetail(),
546 Code: *res.ApplicationError.Code,
547 }
548 }
549 if res.Exception != nil || res.JavaException != nil {
550 // This shouldn't happen, but let's be defensive.
551 return &CallError{
552 Detail: "service bridge returned exception",
553 Code: int32(remotepb.RpcError_UNKNOWN),
554 }
555 }
556 return proto.Unmarshal(res.Response, out)
557}
558
559func (c *context) Request() *http.Request {
560 return c.req
561}
562
563func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
564 // Truncate long log lines.
565 // TODO(dsymonds): Check if this is still necessary.
566 const lim = 8 << 10
567 if len(*ll.Message) > lim {
568 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
569 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
570 }
571
572 c.pendingLogs.Lock()
573 c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
574 c.pendingLogs.Unlock()
575}
576
577var logLevelName = map[int64]string{
578 0: "DEBUG",
579 1: "INFO",
580 2: "WARNING",
581 3: "ERROR",
582 4: "CRITICAL",
583}
584
585func logf(c *context, level int64, format string, args ...interface{}) {
586 if c == nil {
587 panic("not an App Engine context")
588 }
589 s := fmt.Sprintf(format, args...)
590 s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
591 c.addLogLine(&logpb.UserAppLogLine{
592 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
593 Level: &level,
594 Message: &s,
595 })
596 log.Print(logLevelName[level] + ": " + s)
597}
598
599// flushLog attempts to flush any pending logs to the appserver.
600// It should not be called concurrently.
601func (c *context) flushLog(force bool) (flushed bool) {
602 c.pendingLogs.Lock()
603 // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
604 n, rem := 0, 30<<20
605 for ; n < len(c.pendingLogs.lines); n++ {
606 ll := c.pendingLogs.lines[n]
607 // Each log line will require about 3 bytes of overhead.
608 nb := proto.Size(ll) + 3
609 if nb > rem {
610 break
611 }
612 rem -= nb
613 }
614 lines := c.pendingLogs.lines[:n]
615 c.pendingLogs.lines = c.pendingLogs.lines[n:]
616 c.pendingLogs.Unlock()
617
618 if len(lines) == 0 && !force {
619 // Nothing to flush.
620 return false
621 }
622
623 rescueLogs := false
624 defer func() {
625 if rescueLogs {
626 c.pendingLogs.Lock()
627 c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
628 c.pendingLogs.Unlock()
629 }
630 }()
631
632 buf, err := proto.Marshal(&logpb.UserAppLogGroup{
633 LogLine: lines,
634 })
635 if err != nil {
636 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
637 rescueLogs = true
638 return false
639 }
640
641 req := &logpb.FlushRequest{
642 Logs: buf,
643 }
644 res := &basepb.VoidProto{}
645 c.pendingLogs.Lock()
646 c.pendingLogs.flushes++
647 c.pendingLogs.Unlock()
648 if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
649 log.Printf("internal.flushLog: Flush RPC: %v", err)
650 rescueLogs = true
651 return false
652 }
653 return true
654}
655
656const (
657 // Log flushing parameters.
658 flushInterval = 1 * time.Second
659 forceFlushInterval = 60 * time.Second
660)
661
662func (c *context) logFlusher(stop <-chan int) {
663 lastFlush := time.Now()
664 tick := time.NewTicker(flushInterval)
665 for {
666 select {
667 case <-stop:
668 // Request finished.
669 tick.Stop()
670 return
671 case <-tick.C:
672 force := time.Now().Sub(lastFlush) > forceFlushInterval
673 if c.flushLog(force) {
674 lastFlush = time.Now()
675 }
676 }
677 }
678}
679
680func ContextForTesting(req *http.Request) netcontext.Context {
681 return toContext(&context{req: req})
682}