@@ -29,7 +29,6 @@ import (
29
29
"strings"
30
30
31
31
"cloud.google.com/go/functions/metadata"
32
- cloudevents "github.com/cloudevents/sdk-go/v2"
33
32
)
34
33
35
34
const (
@@ -38,25 +37,20 @@ const (
38
37
errorStatus = "error"
39
38
)
40
39
41
- var (
42
- handler = http .DefaultServeMux
43
- )
44
-
45
40
// RegisterHTTPFunction registers fn as an HTTP function.
46
- func RegisterHTTPFunction (path string , fn func (http.ResponseWriter , * http.Request )) error {
47
- return registerHTTPFunction (path , fn , handler )
41
+ func RegisterHTTPFunction (path string , fn interface {}) {
42
+ fnHTTP , ok := fn .(func (http.ResponseWriter , * http.Request ))
43
+ if ! ok {
44
+ panic ("expected function to have signature func(http.ResponseWriter, *http.Request)" )
45
+ }
46
+ registerHTTPFunction (path , fnHTTP , http .DefaultServeMux )
48
47
}
49
48
50
49
// RegisterEventFunction registers fn as an event function. The function must have two arguments, a
51
50
// context.Context and a struct type depending on the event, and return an error. If fn has the
52
- // wrong signature, RegisterEventFunction returns an error.
53
- func RegisterEventFunction (path string , fn interface {}) error {
54
- return registerEventFunction (path , fn , handler )
55
- }
56
-
57
- // RegisterCloudEventFunction registers fn as an cloudevent function.
58
- func RegisterCloudEventFunction (ctx context.Context , path string , fn func (context.Context , cloudevents.Event )) error {
59
- return registerCloudEventFunction (ctx , path , fn , handler )
51
+ // wrong signature, RegisterEventFunction panics.
52
+ func RegisterEventFunction (path string , fn interface {}) {
53
+ registerEventFunction (path , fn , http .DefaultServeMux )
60
54
}
61
55
62
56
// Start serves an HTTP server with registered function(s).
@@ -65,11 +59,10 @@ func Start(port string) error {
65
59
if os .Getenv ("K_SERVICE" ) == "" {
66
60
fmt .Println ("Serving function..." )
67
61
}
68
-
69
- return http .ListenAndServe (":" + port , handler )
62
+ return http .ListenAndServe (":" + port , nil )
70
63
}
71
64
72
- func registerHTTPFunction (path string , fn func (http.ResponseWriter , * http.Request ), h * http.ServeMux ) error {
65
+ func registerHTTPFunction (path string , fn func (http.ResponseWriter , * http.Request ), h * http.ServeMux ) {
73
66
h .HandleFunc (path , func (w http.ResponseWriter , r * http.Request ) {
74
67
// TODO(b/111823046): Remove following once Cloud Functions does not need flushing the logs anymore.
75
68
// Force flush of logs after every function trigger.
@@ -82,14 +75,15 @@ func registerHTTPFunction(path string, fn func(http.ResponseWriter, *http.Reques
82
75
}()
83
76
fn (w , r )
84
77
})
85
- return nil
86
78
}
87
79
88
- func registerEventFunction (path string , fn interface {}, h * http.ServeMux ) error {
89
- err := validateEventFunction (fn )
90
- if err != nil {
91
- return err
92
- }
80
+ func registerEventFunction (path string , fn interface {}, h * http.ServeMux ) {
81
+ defer func () {
82
+ if r := recover (); r != nil {
83
+ fmt .Fprintf (os .Stderr , "Validation panic: %v\n \n %s" , r , debug .Stack ())
84
+ }
85
+ }()
86
+ validateEventFunction (fn )
93
87
h .HandleFunc (path , func (w http.ResponseWriter , r * http.Request ) {
94
88
if os .Getenv ("K_SERVICE" ) != "" {
95
89
// Force flush of logs after every function trigger when running on GCF.
@@ -103,44 +97,36 @@ func registerEventFunction(path string, fn interface{}, h *http.ServeMux) error
103
97
}()
104
98
handleEventFunction (w , r , fn )
105
99
})
106
- return nil
107
- }
108
-
109
- func registerCloudEventFunction (ctx context.Context , path string , fn func (context.Context , cloudevents.Event ), h * http.ServeMux ) error {
110
- p , err := cloudevents .NewHTTP ()
111
- if err != nil {
112
- return fmt .Errorf ("failed to create protocol: %v" , err )
113
- }
114
-
115
- handleFn , err := cloudevents .NewHTTPReceiveHandler (ctx , p , fn )
116
-
117
- if err != nil {
118
- return fmt .Errorf ("failed to create handler: %v" , err )
119
- }
120
-
121
- h .Handle (path , handleFn )
122
- return nil
123
100
}
124
101
125
- func validateEventFunction (fn interface {}) error {
102
+ func validateEventFunction (fn interface {}) {
126
103
ft := reflect .TypeOf (fn )
127
104
if ft .NumIn () != 2 {
128
- return fmt .Errorf ("expected function to have two parameters, found %d" , ft .NumIn ())
105
+ panic ( fmt .Sprintf ("expected function to have two parameters, found %d" , ft .NumIn () ))
129
106
}
130
107
var err error
131
108
errorType := reflect .TypeOf (& err ).Elem ()
132
109
if ft .NumOut () != 1 || ! ft .Out (0 ).AssignableTo (errorType ) {
133
- return fmt . Errorf ("expected function to return only an error" )
110
+ panic ("expected function to return only an error" )
134
111
}
135
112
var ctx context.Context
136
113
ctxType := reflect .TypeOf (& ctx ).Elem ()
137
114
if ! ctxType .AssignableTo (ft .In (0 )) {
138
- return fmt . Errorf ("expected first parameter to be context.Context" )
115
+ panic ("expected first parameter to be context.Context" )
139
116
}
140
- return nil
141
117
}
142
118
143
- func getLegacyEvent (r * http.Request , body []byte ) (* metadata.Metadata , interface {}, error ) {
119
+ func isStructuredCloudEvent (r * http.Request ) bool {
120
+ ceReqHeaders := []string {"Ce-Type" , "Ce-Specversion" , "Ce-Source" , "Ce-Id" }
121
+ for _ , h := range ceReqHeaders {
122
+ if _ , ok := r .Header [http .CanonicalHeaderKey (h )]; ok {
123
+ return true
124
+ }
125
+ }
126
+ return false
127
+ }
128
+
129
+ func getLegacyCloudEvent (r * http.Request , body []byte ) (* metadata.Metadata , interface {}, error ) {
144
130
// Handle legacy events' "data" and "context" fields.
145
131
event := struct {
146
132
Data interface {} `json:"data"`
@@ -150,7 +136,7 @@ func getLegacyEvent(r *http.Request, body []byte) (*metadata.Metadata, interface
150
136
return nil , nil , err
151
137
}
152
138
153
- // If there is no "data" payload, this isn't a legacy event, but that's okay.
139
+ // If there is no "data" payload, this isn't a legacy cloud event, but that's okay.
154
140
if event .Data == nil {
155
141
return nil , nil , nil
156
142
}
@@ -181,12 +167,18 @@ func handleEventFunction(w http.ResponseWriter, r *http.Request, fn interface{})
181
167
return
182
168
}
183
169
184
- // Legacy events have data and an associated metadata, so parse those and run if present.
185
- if metadata , data , err := getLegacyEvent (r , body ); err != nil {
186
- writeHTTPErrorResponse (w , http .StatusBadRequest , crashStatus , fmt .Sprintf ("Error: %s, parsing legacy event: %s" , err .Error (), string (body )))
170
+ // Structured cloud events contain the context in the header, so we need to parse that out.
171
+ if isStructuredCloudEvent (r ) {
172
+ runStructuredCloudEvent (w , r , body , fn )
173
+ return
174
+ }
175
+
176
+ // Legacy cloud events (e.g. pubsub) have data and an associated metadata, so parse those and run if present.
177
+ if metadata , data , err := getLegacyCloudEvent (r , body ); err != nil {
178
+ writeHTTPErrorResponse (w , http .StatusBadRequest , crashStatus , fmt .Sprintf ("Error: %s, parsing legacy cloud event: %s" , err .Error (), string (body )))
187
179
return
188
180
} else if data != nil && metadata != nil {
189
- runLegacyEvent (w , r , metadata , data , fn )
181
+ runLegacyCloudEvent (w , r , metadata , data , fn )
190
182
return
191
183
}
192
184
@@ -195,7 +187,44 @@ func handleEventFunction(w http.ResponseWriter, r *http.Request, fn interface{})
195
187
return
196
188
}
197
189
198
- func runLegacyEvent (w http.ResponseWriter , r * http.Request , m * metadata.Metadata , data , fn interface {}) {
190
+ func runStructuredCloudEvent (w http.ResponseWriter , r * http.Request , body []byte , fn interface {}) {
191
+ // Parse the request to extract the context and the body for the data.
192
+ event := make (map [string ]interface {})
193
+ event ["data" ] = string (body )
194
+ for k , v := range r .Header {
195
+ k = strings .ToLower (k )
196
+ if ! strings .HasPrefix (k , "ce-" ) {
197
+ continue
198
+ }
199
+ k = strings .TrimPrefix (k , "ce-" )
200
+ if len (v ) != 1 {
201
+ writeHTTPErrorResponse (w , http .StatusBadRequest , crashStatus , fmt .Sprintf ("Too many header values: %s" , k ))
202
+ return
203
+ }
204
+ var mapVal map [string ]interface {}
205
+ if err := json .Unmarshal ([]byte (v [0 ]), & mapVal ); err != nil {
206
+ // If there's an error, represent the field as the string from the header. Errors will be caught by the event constructor if present.
207
+ event [k ] = v [0 ]
208
+ } else {
209
+ // Otherwise, represent the unmarshalled map value.
210
+ event [k ] = mapVal
211
+ }
212
+ }
213
+
214
+ // We don't want any escaping to happen here.
215
+ var buf bytes.Buffer
216
+ enc := json .NewEncoder (& buf )
217
+ enc .SetEscapeHTML (false )
218
+ err := enc .Encode (event )
219
+ if err != nil {
220
+ writeHTTPErrorResponse (w , http .StatusBadRequest , crashStatus , fmt .Sprintf ("Unable to construct event %v: %s" , event , err .Error ()))
221
+ return
222
+ }
223
+
224
+ runUserFunction (w , r , buf .Bytes (), fn )
225
+ }
226
+
227
+ func runLegacyCloudEvent (w http.ResponseWriter , r * http.Request , m * metadata.Metadata , data , fn interface {}) {
199
228
var buf bytes.Buffer
200
229
enc := json .NewEncoder (& buf )
201
230
enc .SetEscapeHTML (false )
0 commit comments