Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.

[Federation] Send typing events #572

Merged
merged 11 commits into from
Aug 10, 2018
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumers

import (
"context"
"encoding/json"

"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/queue"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/dendrite/typingserver/dummy/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
"gopkg.in/Shopify/sarama.v1"
)

// OutputTypingEventConsumer consumes events that originate in typing server.
type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer
db *storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
}

// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. Call Start() to begin consuming from typing servers.
func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
}
c := &OutputTypingEventConsumer{
consumer: &consumer,
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
}
consumer.ProcessMessage = c.onMessage

return c
}

// Start consuming from typing servers
func (t *OutputTypingEventConsumer) Start() error {
return t.consumer.Start()
}

func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Extract the typing event from msg.
var ote api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &ote); err != nil {
// Skip this msg but continue processing messages.
log.WithError(err).Errorf("typingserver output log: message parse failed")
return nil
}

joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID())
if err != nil {
return err
}

names := make([]gomatrixserverlib.ServerName, len(joined))
for i := range joined {
names[i] = joined[i].ServerName
}

return t.queues.SendEvent(&ote.Event, t.ServerName, names)
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ func SetupFederationSenderComponent(

queues := queue.NewOutgoingQueues(base.Cfg.Matrix.ServerName, federation)

consumer := consumers.NewOutputRoomEventConsumer(
rsConsumer := consumers.NewOutputRoomEventConsumer(
base.Cfg, base.KafkaConsumer, queues,
federationSenderDB, queryAPI,
)
if err = consumer.Start(); err != nil {
if err = rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start room server consumer")
}

tsConsumer := consumers.NewOutputTypingEventConsumer(
base.Cfg, base.KafkaConsumer, queues, federationSenderDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,22 @@ func (s *joinedHostsStatements) deleteJoinedHosts(
return err
}

func (s *joinedHostsStatements) selectJoinedHosts(
func (s *joinedHostsStatements) selectJoinedHostsWithTx(
ctx context.Context, txn *sql.Tx, roomID string,
) ([]types.JoinedHost, error) {
stmt := common.TxStmt(txn, s.selectJoinedHostsStmt)
return joinedHostsFromStmt(ctx, stmt, roomID)
}

func (s *joinedHostsStatements) selectJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return joinedHostsFromStmt(ctx, s.selectJoinedHostsStmt, roomID)
}

func joinedHostsFromStmt(
ctx context.Context, stmt *sql.Stmt, roomID string,
) ([]types.JoinedHost, error) {
rows, err := stmt.QueryContext(ctx, roomID)
if err != nil {
return nil, err
Expand All @@ -118,5 +130,6 @@ func (s *joinedHostsStatements) selectJoinedHosts(
ServerName: gomatrixserverlib.ServerName(serverName),
})
}

return result, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *Database) UpdateRoom(
}
}

joinedHosts, err = d.selectJoinedHosts(ctx, txn, roomID)
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
Expand All @@ -110,3 +110,12 @@ func (d *Database) UpdateRoom(
})
return
}

// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import "github.com/matrix-org/gomatrixserverlib"

// TODO: Remove this package after, typingserver/api is updated to contain a gomatrixserverlib.Event
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably move this TODO out of dummy package now that the plan is no longer to change to using Event


// OutputTypingEvent is an entry in typing server output kafka log.
type OutputTypingEvent struct {
// The Event for the typing edu event.
Event gomatrixserverlib.Event `json:"event"`
}