Skip to content

Commit dd7dd30

Browse files
authored
Merge pull request #80 from blinklabs-io/feat/webhook
feat: webhook
2 parents fa163ae + b5ac9fd commit dd7dd30

File tree

4 files changed

+220
-0
lines changed

4 files changed

+220
-0
lines changed

output/output.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ package output
1818
import (
1919
_ "github.com/blinklabs-io/snek/output/log"
2020
_ "github.com/blinklabs-io/snek/output/notify"
21+
_ "github.com/blinklabs-io/snek/output/webhook"
2122
)

output/webhook/options.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package webhook
16+
17+
// import "github.com/blinklabs-io/snek/event"
18+
19+
type WebhookOptionFunc func(*WebhookOutput)
20+
21+
// WithUrl specifies the webhook URL
22+
func WithUrl(url string) WebhookOptionFunc {
23+
return func(o *WebhookOutput) {
24+
o.url = url
25+
}
26+
}

output/webhook/plugin.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package webhook
16+
17+
import (
18+
"github.com/blinklabs-io/snek/plugin"
19+
)
20+
21+
var cmdlineOptions struct {
22+
url string
23+
}
24+
25+
func init() {
26+
plugin.Register(
27+
plugin.PluginEntry{
28+
Type: plugin.PluginTypeOutput,
29+
Name: "webhook",
30+
Description: "send events via HTTP POST to a webhook server",
31+
NewFromOptionsFunc: NewFromCmdlineOptions,
32+
Options: []plugin.PluginOption{
33+
{
34+
Name: "url",
35+
Type: plugin.PluginOptionTypeString,
36+
Description: "specifies the url to use",
37+
DefaultValue: "http://localhost:3000",
38+
Dest: &(cmdlineOptions.url),
39+
},
40+
},
41+
},
42+
)
43+
}
44+
45+
func NewFromCmdlineOptions() plugin.Plugin {
46+
p := New(
47+
WithUrl(cmdlineOptions.url),
48+
)
49+
return p
50+
}

output/webhook/webhook.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package webhook
16+
17+
import (
18+
"bytes"
19+
"encoding/json"
20+
"fmt"
21+
"io"
22+
"net/http"
23+
24+
"github.com/blinklabs-io/snek/event"
25+
"github.com/blinklabs-io/snek/input/chainsync"
26+
"github.com/blinklabs-io/snek/internal/logging"
27+
"github.com/blinklabs-io/snek/internal/version"
28+
)
29+
30+
type WebhookOutput struct {
31+
errorChan chan error
32+
eventChan chan event.Event
33+
url string
34+
}
35+
36+
func New(options ...WebhookOptionFunc) *WebhookOutput {
37+
w := &WebhookOutput{
38+
errorChan: make(chan error),
39+
eventChan: make(chan event.Event, 10),
40+
url: "http://localhost:3000",
41+
}
42+
for _, option := range options {
43+
option(w)
44+
}
45+
return w
46+
}
47+
48+
// Start the webhook output
49+
func (w *WebhookOutput) Start() error {
50+
logger := logging.GetLogger()
51+
logger.Infof("starting webhook server")
52+
go func() {
53+
for {
54+
evt, ok := <-w.eventChan
55+
// Channel has been closed, which means we're shutting down
56+
if !ok {
57+
return
58+
}
59+
payload := evt.Payload
60+
if payload == nil {
61+
panic(fmt.Errorf("ERROR: %v", payload))
62+
}
63+
logger.Infof("debug: type: %s", evt.Type)
64+
switch evt.Type {
65+
case "chainsync.block":
66+
be := payload.(chainsync.BlockEvent)
67+
evt.Payload = be
68+
case "chainsync.rollback":
69+
re := payload.(chainsync.RollbackEvent)
70+
evt.Payload = re
71+
case "chainsync.transaction":
72+
te := payload.(chainsync.TransactionEvent)
73+
evt.Payload = te
74+
default:
75+
logger.Errorf("unknown event type: %s", evt.Type)
76+
return
77+
}
78+
// TODO: error handle
79+
err := SendWebhook(&evt, w.url)
80+
if err != nil {
81+
logger.Errorf("ERROR: %s", err)
82+
}
83+
}
84+
}()
85+
return nil
86+
}
87+
88+
func SendWebhook(e *event.Event, url string) error {
89+
logger := logging.GetLogger()
90+
logger.Infof("sending event %s to %s", e.Type, url)
91+
data, err := json.Marshal(e)
92+
if err != nil {
93+
return fmt.Errorf("%s", err)
94+
}
95+
// Setup request
96+
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(data))
97+
if err != nil {
98+
return fmt.Errorf("%s", err)
99+
}
100+
req.Header.Add("Content-Type", "application/json")
101+
req.Header.Add("User-Agent", fmt.Sprintf("Snek/%s", version.GetVersionString()))
102+
// Send payload
103+
resp, err := http.DefaultClient.Do(req)
104+
if err != nil {
105+
return fmt.Errorf("%s", err)
106+
}
107+
respBody, err := io.ReadAll(resp.Body)
108+
if err != nil {
109+
return fmt.Errorf("%s", err)
110+
}
111+
defer resp.Body.Close()
112+
113+
logger.Infof("sent: %s, payload: %s, body: %s, response: %s, status: %d",
114+
url,
115+
string(data),
116+
string(respBody),
117+
resp.Status,
118+
resp.StatusCode,
119+
)
120+
return nil
121+
}
122+
123+
// Stop the embedded output
124+
func (w *WebhookOutput) Stop() error {
125+
close(w.eventChan)
126+
close(w.errorChan)
127+
return nil
128+
}
129+
130+
// ErrorChan returns the input error channel
131+
func (w *WebhookOutput) ErrorChan() chan error {
132+
return w.errorChan
133+
}
134+
135+
// InputChan returns the input event channel
136+
func (w *WebhookOutput) InputChan() chan<- event.Event {
137+
return w.eventChan
138+
}
139+
140+
// OutputChan always returns nil
141+
func (w *WebhookOutput) OutputChan() <-chan event.Event {
142+
return nil
143+
}

0 commit comments

Comments
 (0)