Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 466203d

Browse files
committed
plugins hooked into cmd/ocagent
* Also added a canonical example as well as regression test to ensure that plugins work as expected. This change finishes up the work started in #46. * plugins_test: skip TestCreatePlugin On Travis CI, trying to retrieve the same version of Go as is being used at runtime is quite the hassle. Since this test is a nice-to-have and more of an integration test that runs properly locally, skip it for now until we have enough time to expend investigating Travis CI e.g. https://travis-ci.org/census-instrumentation/opencensus-service/builds/438157975 Fixes #47
1 parent fd2e429 commit 466203d

File tree

7 files changed

+383
-14
lines changed

7 files changed

+383
-14
lines changed

cmd/ocagent/main.go

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
3030
"github.com/census-instrumentation/opencensus-service/cmd/ocagent/exporterparser"
31+
"github.com/census-instrumentation/opencensus-service/cmd/ocagent/plugins"
3132
"github.com/census-instrumentation/opencensus-service/exporter"
3233
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
3334
"github.com/census-instrumentation/opencensus-service/internal"
@@ -38,26 +39,16 @@ import (
3839

3940
func main() {
4041
ocInterceptorPort := flag.Int("oci-port", 55678, "The port on which the OpenCensus interceptor is run")
41-
flag.Parse()
42-
4342
exportersYAMLConfigFile := flag.String("exporters-yaml", "config.yaml", "The YAML file with the configurations for the various exporters")
43+
pluginPaths := flag.String("plugins", "", "A comma separated string specifying the paths of the various exporter plugins")
44+
45+
flag.Parse()
4446

4547
yamlBlob, err := ioutil.ReadFile(*exportersYAMLConfigFile)
4648
if err != nil {
4749
log.Fatalf("Cannot read the YAML file %v error: %v", exportersYAMLConfigFile, err)
4850
}
4951
traceExporters, _, closeFns := exporterparser.ExportersFromYAMLConfig(yamlBlob)
50-
51-
commonSpanReceiver := exporter.OCExportersToTraceExporter(traceExporters...)
52-
53-
// Add other interceptors here as they are implemented
54-
ocInterceptorDoneFn, err := runOCInterceptor(*ocInterceptorPort, commonSpanReceiver)
55-
if err != nil {
56-
log.Fatal(err)
57-
}
58-
59-
closeFns = append(closeFns, ocInterceptorDoneFn)
60-
6152
// Always cleanup finally
6253
defer func() {
6354
for _, closeFn := range closeFns {
@@ -66,6 +57,21 @@ func main() {
6657
}
6758
}
6859
}()
60+
inCodeExportersSpanReceiver := exporter.OCExportersToTraceExporter(traceExporters...)
61+
62+
// Load the various traceExporter plugins if specified
63+
pluginsSpanReceiver, pluginsDoneFn := plugins.LoadTraceExporterPlugins(yamlBlob, *pluginPaths)
64+
defer pluginsDoneFn()
65+
66+
// Combine the various spanreceiver.SpanReceiver instances into one
67+
commonSpanReceiver := spanreceiver.Multi(inCodeExportersSpanReceiver, pluginsSpanReceiver)
68+
69+
// Add other interceptors here as they are implemented
70+
ocInterceptorDoneFn, err := runOCInterceptor(*ocInterceptorPort, commonSpanReceiver)
71+
if err != nil {
72+
log.Fatal(err)
73+
}
74+
closeFns = append(closeFns, ocInterceptorDoneFn)
6975

7076
signalsChan := make(chan os.Signal)
7177
signal.Notify(signalsChan, os.Interrupt)

cmd/ocagent/plugins/plugins.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2018, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package plugins
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"log"
21+
"path/filepath"
22+
"plugin"
23+
"strings"
24+
25+
"github.com/census-instrumentation/opencensus-service/exporter"
26+
"github.com/census-instrumentation/opencensus-service/spanreceiver"
27+
)
28+
29+
type pluginLoadStatus struct {
30+
absPluginPath string
31+
pluginPath string
32+
err error
33+
traceExporters []exporter.TraceExporter
34+
stopFns []func(context.Context) error
35+
}
36+
37+
// loadPluginsFromYAML is a helper that takes in the YAML configuration file as well as the paths
38+
// to the various plugins. It expects each path to be resolvable locally but that the loaded plugin
39+
// contain the "constructor" symbol "NewTraceExporterPlugin" of type `func() exporter.TraceExporterPlugin`
40+
func loadPluginsFromYAML(configYAML []byte, pluginPaths []string) (results []*pluginLoadStatus) {
41+
for _, pluginPath := range pluginPaths {
42+
absPluginPath, err := filepath.Abs(pluginPath)
43+
status := &pluginLoadStatus{
44+
pluginPath: pluginPath,
45+
absPluginPath: absPluginPath,
46+
}
47+
if err != nil {
48+
status.err = err
49+
continue
50+
}
51+
// Firstly ensure we add the result regardless of parse errors.
52+
results = append(results, status)
53+
p, err := plugin.Open(absPluginPath)
54+
if err != nil {
55+
status.err = err
56+
continue
57+
}
58+
59+
iface, err := p.Lookup("NewTraceExporterPlugin")
60+
if err != nil {
61+
status.err = err
62+
continue
63+
}
64+
constructor, ok := iface.(func() exporter.TraceExporterPlugin)
65+
if !ok {
66+
status.err = fmt.Errorf("Invalid Type for NewTraceExporterPlugin: %T", iface)
67+
continue
68+
}
69+
tplugin := constructor()
70+
status.stopFns = append(status.stopFns, tplugin.Stop)
71+
status.traceExporters, err = tplugin.TraceExportersFromConfig(configYAML, exporter.ConfigTypeYAML)
72+
if err != nil {
73+
status.err = fmt.Errorf("Failed to parse YAML: %v", err)
74+
}
75+
}
76+
return
77+
}
78+
79+
// LoadTraceExporterPlugins handles loading TraceExporter plugins from the various
80+
// paths. On failed loads, it logs the error and the failed plugin path.
81+
func LoadTraceExporterPlugins(configYAML []byte, pluginPathsCSV string) (sr spanreceiver.SpanReceiver, stopFn func()) {
82+
var exporters []exporter.TraceExporter
83+
var stopFns []func(context.Context) error
84+
defer func() {
85+
stopFn = func() {
86+
ctx := context.Background()
87+
for _, stop := range stopFns {
88+
_ = stop(ctx)
89+
}
90+
}
91+
}()
92+
93+
if pluginPathsCSV == "" {
94+
return
95+
}
96+
pluginPaths := strings.Split(pluginPathsCSV, ",")
97+
results := loadPluginsFromYAML(configYAML, pluginPaths)
98+
for _, result := range results {
99+
if result.err != nil {
100+
msg := "Failed to load the plugin from " + result.pluginPath
101+
if result.pluginPath != result.absPluginPath {
102+
msg += " (" + result.absPluginPath + ")"
103+
}
104+
log.Printf("%s Error: %v", msg, result.err)
105+
continue
106+
}
107+
exporters = append(exporters, result.traceExporters...)
108+
stopFns = append(stopFns, result.stopFns...)
109+
}
110+
111+
sr = exporter.TraceExportersToSpanReceiver(exporters...)
112+
return
113+
}

cmd/ocagent/plugins/plugins_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2018, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/*
16+
This test serves as an end-to-end canonical example of consuming a TraceExporter
17+
plugin to ensure that the code for parsing out plugins never regresses.
18+
*/
19+
package plugins_test
20+
21+
import (
22+
"bytes"
23+
"context"
24+
"io/ioutil"
25+
"log"
26+
"os"
27+
"os/exec"
28+
"path/filepath"
29+
"runtime"
30+
"testing"
31+
"time"
32+
33+
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
34+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
35+
"github.com/census-instrumentation/opencensus-service/cmd/ocagent/plugins"
36+
)
37+
38+
func TestCreatePlugin(t *testing.T) {
39+
t.Skip("Skipping for now as making Go versions match on Travis CI is a hassle")
40+
41+
// Compile the plugin and make a .so
42+
tmpDir, err := ioutil.TempDir("", "plugin_end_to_end")
43+
if err != nil {
44+
t.Fatalf("Failed to create temporary directory: %v", err)
45+
}
46+
defer os.RemoveAll(tmpDir)
47+
48+
goBinaryPath := properGoBinaryPath()
49+
pluginPath := filepath.Join(tmpDir, "sample.so")
50+
cmd := exec.Command(goBinaryPath, "build", "-buildmode=plugin", "-o", pluginPath, "./testdata/sample_plugin.go")
51+
output, err := cmd.CombinedOutput()
52+
if err != nil {
53+
t.Fatalf("Failed to compile and create shared object file %q: %v\n%s\nGoBinPath: %q", pluginPath, err, output, goBinaryPath)
54+
}
55+
56+
configYAML := `
57+
counter:
58+
count: 1
59+
`
60+
61+
// Given that plugins.LoadTraceExporterPlugins writes errors to log.Output
62+
// we'll hijack that writers and ensure that we get NO output
63+
checkWriter := new(bytes.Buffer)
64+
log.SetOutput(checkWriter)
65+
// Before we exit this test, revert the log output writer to stderr.
66+
defer log.SetOutput(os.Stderr)
67+
68+
// Now that we have the plugin written to disk, let's now load it.
69+
sr, stopFn := plugins.LoadTraceExporterPlugins([]byte(configYAML), pluginPath)
70+
if sr == nil {
71+
t.Fatal("Failed to create a spanreceiver")
72+
}
73+
defer stopFn()
74+
75+
node := &commonpb.Node{}
76+
span := &tracepb.Span{
77+
TraceId: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10},
78+
}
79+
sr.ReceiveSpans(context.Background(), node, span)
80+
<-time.After(5 * time.Millisecond)
81+
82+
// Now check if we got any output
83+
if g := checkWriter.String(); g != "" {
84+
t.Errorf("Unexpected output: %s", g)
85+
}
86+
}
87+
88+
// This helper function is necessary to ensure that we use
89+
// the same Go binary to compile plugins as well as to run
90+
// this test.
91+
// Otherwise we'll run into such failed tests:
92+
// https://travis-ci.org/census-instrumentation/opencensus-service/builds/438157975
93+
func properGoBinaryPath() string {
94+
goSuffix := "go"
95+
if runtime.GOOS == "windows" {
96+
goSuffix += ".exe"
97+
}
98+
// Firstly check if we are using $GOROOT/bin/go*
99+
goBinPath := filepath.Join(runtime.GOROOT(), "bin", goSuffix)
100+
if _, err := os.Stat(goBinPath); err == nil {
101+
return goBinPath
102+
}
103+
// If that has failed, now trying looking it from the environment
104+
binPath, _ := exec.LookPath(goSuffix)
105+
return binPath
106+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2018, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/*
16+
This file serves as a canonical example of writing a plugin
17+
for TraceExporter. It is consumed in plugins_test.go
18+
*/
19+
package main
20+
21+
import (
22+
"context"
23+
"errors"
24+
"fmt"
25+
"sync"
26+
27+
"go.opencensus.io/trace"
28+
yaml "gopkg.in/yaml.v2"
29+
30+
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
31+
"github.com/census-instrumentation/opencensus-service/exporter"
32+
)
33+
34+
type eachExporter struct {
35+
mu sync.Mutex
36+
stopped bool
37+
spandata []*trace.SpanData
38+
}
39+
40+
func NewTraceExporterPlugin() exporter.TraceExporterPlugin {
41+
return new(parser)
42+
}
43+
44+
type parser struct {
45+
es []*eachExporter
46+
}
47+
48+
type config struct {
49+
Counter *struct {
50+
Count int `yaml:"count"`
51+
} `yaml:"counter"`
52+
}
53+
54+
func (p *parser) TraceExportersFromConfig(blob []byte, configType exporter.ConfigType) (eachExporters []exporter.TraceExporter, err error) {
55+
if configType != exporter.ConfigTypeYAML {
56+
return nil, fmt.Errorf("Expected ConfigTypeYAML instead of %v", configType)
57+
}
58+
cfg := new(config)
59+
if err := yaml.Unmarshal(blob, cfg); err != nil {
60+
return nil, err
61+
}
62+
cnt := cfg.Counter
63+
if cnt == nil {
64+
return nil, errors.New("Failed to parse a counter")
65+
}
66+
67+
for i := 0; i < cnt.Count; i++ {
68+
ex := new(eachExporter)
69+
eachExporters = append(eachExporters, ex)
70+
p.es = append(p.es, ex)
71+
}
72+
return
73+
}
74+
75+
func (p *parser) Stop(ctx context.Context) error {
76+
for _, e := range p.es {
77+
e.Stop(ctx)
78+
}
79+
return nil
80+
}
81+
82+
func (e *eachExporter) ExportSpanData(node *commonpb.Node, spandata ...*trace.SpanData) error {
83+
e.mu.Lock()
84+
defer e.mu.Unlock()
85+
86+
if e.stopped {
87+
return fmt.Errorf("Already stopped")
88+
}
89+
e.spandata = append(e.spandata, spandata...)
90+
return nil
91+
}
92+
93+
func (e *eachExporter) Stop(ctx context.Context) {
94+
e.mu.Lock()
95+
if !e.stopped {
96+
e.stopped = true
97+
}
98+
e.mu.Unlock()
99+
}

0 commit comments

Comments
 (0)