diff --git a/pkg/plugin/datasource.go b/pkg/plugin/datasource.go index fe0f6ce..623dde4 100644 --- a/pkg/plugin/datasource.go +++ b/pkg/plugin/datasource.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" @@ -157,12 +156,7 @@ func (ds *MQTTDatasource) Query(query backend.DataQuery) backend.DataResponse { // ensure the client is subscribed to the topic. ds.Client.Subscribe(qm.Topic) - messages, ok := ds.Client.Messages(qm.Topic) - if !ok { - return response - } - - frame := ToFrame(qm.Topic, messages) + frame := data.NewFrame(qm.Topic) if qm.Topic != "" { frame.SetMeta(&data.FrameMeta{ @@ -179,13 +173,8 @@ func (ds *MQTTDatasource) SendMessage(msg mqtt.StreamMessage, req *backend.RunSt return nil } - message := mqtt.Message{ - Timestamp: time.Now(), - Value: msg.Value, - } - - frame := ToFrame(msg.Topic, []mqtt.Message{message}) + messageJSON := json.RawMessage(msg.Value) log.DefaultLogger.Debug(fmt.Sprintf("Sending message to client for topic %s", msg.Topic)) - return sender.SendFrame(frame, data.IncludeAll) + return sender.SendJSON(messageJSON) } diff --git a/test_broker.js b/test_broker.js index fbb057e..de7f8d7 100644 --- a/test_broker.js +++ b/test_broker.js @@ -12,7 +12,55 @@ const toMillis = { }; const createPublisher = ({ topic, qos }) => { - let i = 0; + if (topic == "imu") { + let i = 0; + console.log(`publishing to topic:`, topic); + publishers[topic] = setInterval(() => { + var j = { + "sample": i, + "properties": ["acceleration", "velocity"], + "sensors": ["accelerometer", "gyroscope"], + "axes": ["x", "y", "z"], + "sensor": { + "accelerometer": { + "x": Math.random(), + "y": Math.random(), + "z": Math.random() + }, + "gyroscope": { + "x": Math.random(), + "y": Math.random(), + "z": Math.random() + } + }, + "axis": { + "x": { + "accelerometer": Math.random(), + "gyroscope": Math.random() + }, + "y": { + "accelerometer": Math.random(), + "gyroscope": Math.random() + }, + "z": { + "accelerometer": Math.random(), + "gyroscope": Math.random() + } + } + }; + var out = JSON.stringify(j); + aedes.publish({ + topic, + cmd: 'publish', + qos, + retain: false, + payload: out, + }); + i+=1; + }, 1000); + return + } + const [duration, value] = topic.split('/'); const fn = toMillis[duration];