Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

plugins hooked into cmd/ocagent #70

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions cmd/ocagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/cmd/ocagent/exporterparser"
"github.com/census-instrumentation/opencensus-service/cmd/ocagent/plugins"
"github.com/census-instrumentation/opencensus-service/exporter"
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
"github.com/census-instrumentation/opencensus-service/internal"
Expand All @@ -38,26 +39,16 @@ import (

func main() {
ocInterceptorPort := flag.Int("oci-port", 55678, "The port on which the OpenCensus interceptor is run")
flag.Parse()

exportersYAMLConfigFile := flag.String("exporters-yaml", "config.yaml", "The YAML file with the configurations for the various exporters")
pluginPaths := flag.String("plugins", "", "A comma separated string specifying the paths of the various exporter plugins")

flag.Parse()

yamlBlob, err := ioutil.ReadFile(*exportersYAMLConfigFile)
if err != nil {
log.Fatalf("Cannot read the YAML file %v error: %v", exportersYAMLConfigFile, err)
}
traceExporters, _, closeFns := exporterparser.ExportersFromYAMLConfig(yamlBlob)

commonSpanReceiver := exporter.OCExportersToTraceExporter(traceExporters...)

// Add other interceptors here as they are implemented
ocInterceptorDoneFn, err := runOCInterceptor(*ocInterceptorPort, commonSpanReceiver)
if err != nil {
log.Fatal(err)
}

closeFns = append(closeFns, ocInterceptorDoneFn)

// Always cleanup finally
defer func() {
for _, closeFn := range closeFns {
Expand All @@ -66,6 +57,21 @@ func main() {
}
}
}()
inCodeExportersSpanReceiver := exporter.OCExportersToTraceExporter(traceExporters...)

// Load the various traceExporter plugins if specified
pluginsSpanReceiver, pluginsDoneFn := plugins.LoadTraceExporterPlugins(yamlBlob, *pluginPaths)
defer pluginsDoneFn()

// Combine the various spanreceiver.SpanReceiver instances into one
commonSpanReceiver := spanreceiver.Multi(inCodeExportersSpanReceiver, pluginsSpanReceiver)

// Add other interceptors here as they are implemented
ocInterceptorDoneFn, err := runOCInterceptor(*ocInterceptorPort, commonSpanReceiver)
if err != nil {
log.Fatal(err)
}
closeFns = append(closeFns, ocInterceptorDoneFn)

signalsChan := make(chan os.Signal)
signal.Notify(signalsChan, os.Interrupt)
Expand Down
113 changes: 113 additions & 0 deletions cmd/ocagent/plugins/plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package plugins

import (
"context"
"fmt"
"log"
"path/filepath"
"plugin"
"strings"

"github.com/census-instrumentation/opencensus-service/exporter"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
)

type pluginLoadStatus struct {
absPluginPath string
pluginPath string
err error
traceExporters []exporter.TraceExporter
stopFns []func(context.Context) error
}

// loadPluginsFromYAML is a helper that takes in the YAML configuration file as well as the paths
// to the various plugins. It expects each path to be resolvable locally but that the loaded plugin
// contain the "constructor" symbol "NewTraceExporterPlugin" of type `func() exporter.TraceExporterPlugin`
func loadPluginsFromYAML(configYAML []byte, pluginPaths []string) (results []*pluginLoadStatus) {
for _, pluginPath := range pluginPaths {
absPluginPath, err := filepath.Abs(pluginPath)
status := &pluginLoadStatus{
pluginPath: pluginPath,
absPluginPath: absPluginPath,
}
if err != nil {
status.err = err
continue
}
// Firstly ensure we add the result regardless of parse errors.
results = append(results, status)
p, err := plugin.Open(absPluginPath)
if err != nil {
status.err = err
continue
}

iface, err := p.Lookup("NewTraceExporterPlugin")
if err != nil {
status.err = err
continue
}
constructor, ok := iface.(func() exporter.TraceExporterPlugin)
if !ok {
status.err = fmt.Errorf("Invalid Type for NewTraceExporterPlugin: %T", iface)
continue
}
tplugin := constructor()
status.stopFns = append(status.stopFns, tplugin.Stop)
status.traceExporters, err = tplugin.TraceExportersFromConfig(configYAML, exporter.ConfigTypeYAML)
if err != nil {
status.err = fmt.Errorf("Failed to parse YAML: %v", err)
}
}
return
}

// LoadTraceExporterPlugins handles loading TraceExporter plugins from the various
// paths. On failed loads, it logs the error and the failed plugin path.
func LoadTraceExporterPlugins(configYAML []byte, pluginPathsCSV string) (sr spanreceiver.SpanReceiver, stopFn func()) {
var exporters []exporter.TraceExporter
var stopFns []func(context.Context) error
defer func() {
stopFn = func() {
ctx := context.Background()
for _, stop := range stopFns {
_ = stop(ctx)
}
}
}()

if pluginPathsCSV == "" {
return
}
pluginPaths := strings.Split(pluginPathsCSV, ",")
results := loadPluginsFromYAML(configYAML, pluginPaths)
for _, result := range results {
if result.err != nil {
msg := "Failed to load the plugin from " + result.pluginPath
if result.pluginPath != result.absPluginPath {
msg += " (" + result.absPluginPath + ")"
}
log.Printf("%s Error: %v", msg, result.err)
continue
}
exporters = append(exporters, result.traceExporters...)
stopFns = append(stopFns, result.stopFns...)
}

sr = exporter.TraceExportersToSpanReceiver(exporters...)
return
}
106 changes: 106 additions & 0 deletions cmd/ocagent/plugins/plugins_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
This test serves as an end-to-end canonical example of consuming a TraceExporter
plugin to ensure that the code for parsing out plugins never regresses.
*/
package plugins_test

import (
"bytes"
"context"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"testing"
"time"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/cmd/ocagent/plugins"
)

func TestCreatePlugin(t *testing.T) {
t.Skip("Skipping for now as making Go versions match on Travis CI is a hassle")

// Compile the plugin and make a .so
tmpDir, err := ioutil.TempDir("", "plugin_end_to_end")
if err != nil {
t.Fatalf("Failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tmpDir)

goBinaryPath := properGoBinaryPath()
pluginPath := filepath.Join(tmpDir, "sample.so")
cmd := exec.Command(goBinaryPath, "build", "-buildmode=plugin", "-o", pluginPath, "./testdata/sample_plugin.go")
output, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("Failed to compile and create shared object file %q: %v\n%s\nGoBinPath: %q", pluginPath, err, output, goBinaryPath)
}

configYAML := `
counter:
count: 1
`

// Given that plugins.LoadTraceExporterPlugins writes errors to log.Output
// we'll hijack that writers and ensure that we get NO output
checkWriter := new(bytes.Buffer)
log.SetOutput(checkWriter)
// Before we exit this test, revert the log output writer to stderr.
defer log.SetOutput(os.Stderr)

// Now that we have the plugin written to disk, let's now load it.
sr, stopFn := plugins.LoadTraceExporterPlugins([]byte(configYAML), pluginPath)
if sr == nil {
t.Fatal("Failed to create a spanreceiver")
}
defer stopFn()

node := &commonpb.Node{}
span := &tracepb.Span{
TraceId: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10},
}
sr.ReceiveSpans(context.Background(), node, span)
<-time.After(5 * time.Millisecond)

// Now check if we got any output
if g := checkWriter.String(); g != "" {
t.Errorf("Unexpected output: %s", g)
}
}

// This helper function is necessary to ensure that we use
// the same Go binary to compile plugins as well as to run
// this test.
// Otherwise we'll run into such failed tests:
// https://travis-ci.org/census-instrumentation/opencensus-service/builds/438157975
func properGoBinaryPath() string {
goSuffix := "go"
if runtime.GOOS == "windows" {
goSuffix += ".exe"
}
// Firstly check if we are using $GOROOT/bin/go*
goBinPath := filepath.Join(runtime.GOROOT(), "bin", goSuffix)
if _, err := os.Stat(goBinPath); err == nil {
return goBinPath
}
// If that has failed, now trying looking it from the environment
binPath, _ := exec.LookPath(goSuffix)
return binPath
}
99 changes: 99 additions & 0 deletions cmd/ocagent/plugins/testdata/sample_plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
This file serves as a canonical example of writing a plugin
for TraceExporter. It is consumed in plugins_test.go
*/
package main

import (
"context"
"errors"
"fmt"
"sync"

"go.opencensus.io/trace"
yaml "gopkg.in/yaml.v2"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
"github.com/census-instrumentation/opencensus-service/exporter"
)

type eachExporter struct {
mu sync.Mutex
stopped bool
spandata []*trace.SpanData
}

func NewTraceExporterPlugin() exporter.TraceExporterPlugin {
return new(parser)
}

type parser struct {
es []*eachExporter
}

type config struct {
Counter *struct {
Count int `yaml:"count"`
} `yaml:"counter"`
}

func (p *parser) TraceExportersFromConfig(blob []byte, configType exporter.ConfigType) (eachExporters []exporter.TraceExporter, err error) {
if configType != exporter.ConfigTypeYAML {
return nil, fmt.Errorf("Expected ConfigTypeYAML instead of %v", configType)
}
cfg := new(config)
if err := yaml.Unmarshal(blob, cfg); err != nil {
return nil, err
}
cnt := cfg.Counter
if cnt == nil {
return nil, errors.New("Failed to parse a counter")
}

for i := 0; i < cnt.Count; i++ {
ex := new(eachExporter)
eachExporters = append(eachExporters, ex)
p.es = append(p.es, ex)
}
return
}

func (p *parser) Stop(ctx context.Context) error {
for _, e := range p.es {
e.Stop(ctx)
}
return nil
}

func (e *eachExporter) ExportSpanData(node *commonpb.Node, spandata ...*trace.SpanData) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.stopped {
return fmt.Errorf("Already stopped")
}
e.spandata = append(e.spandata, spandata...)
return nil
}

func (e *eachExporter) Stop(ctx context.Context) {
e.mu.Lock()
if !e.stopped {
e.stopped = true
}
e.mu.Unlock()
}
Loading