Skip to content

Commit fe77434

Browse files
committed
allow overriding pinot cluster query timeout (#7398)
1 parent d6f1902 commit fe77434

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

runtime/drivers/pinot/olap.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error {
4545

4646
func (c *connection) Query(ctx context.Context, stmt *drivers.Statement) (*drivers.Result, error) {
4747
if c.logQueries {
48-
c.logger.Info("pinot query", zap.String("sql", stmt.Query), zap.Any("args", stmt.Args), observability.ZapCtx(ctx))
48+
c.logger.Info("pinot query", zap.String("sql", stmt.Query), zap.Any("args", stmt.Args), zap.Int64("timeoutMS", c.timeoutMS), observability.ZapCtx(ctx))
4949
}
5050
if stmt.DryRun {
5151
rows, err := c.db.QueryxContext(ctx, "EXPLAIN PLAN FOR "+stmt.Query, stmt.Args...)
@@ -61,6 +61,11 @@ func (c *connection) Query(ctx context.Context, stmt *drivers.Statement) (*drive
6161
ctx, cancelFunc = context.WithTimeout(ctx, stmt.ExecutionTimeout)
6262
}
6363

64+
// add timeout if configured to the sql to propagate it to the Pinot server to override the cluster timeout
65+
if c.timeoutMS > 0 {
66+
stmt.Query = fmt.Sprintf("SET timeoutMS=%d; %s", c.timeoutMS, stmt.Query)
67+
}
68+
6469
rows, err := c.db.QueryxContext(ctx, stmt.Query, stmt.Args...)
6570
if err != nil {
6671
if cancelFunc != nil {

runtime/drivers/pinot/pinot.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ type configProperties struct {
114114
LogQueries bool `mapstructure:"log_queries"`
115115
// MaxOpenConns is the maximum number of open connections to the database. Set to 0 to use the default value or -1 for unlimited.
116116
MaxOpenConns int `mapstructure:"max_open_conns"`
117+
// TimeoutMS is the timeout in milliseconds for queries. Set to 0 to use the cluster default.
118+
TimeoutMS int64 `mapstructure:"timeout_ms"`
117119
}
118120

119121
// Open a connection to Apache Pinot using HTTP API.
@@ -205,6 +207,7 @@ func (d driver) Open(instanceID string, config map[string]any, st *storage.Clien
205207
schemaURL: controller,
206208
headers: headers,
207209
logQueries: conf.LogQueries,
210+
timeoutMS: conf.TimeoutMS,
208211
logger: logger,
209212
}
210213
return conn, nil
@@ -229,6 +232,7 @@ type connection struct {
229232
schemaURL string
230233
headers map[string]string
231234
logQueries bool
235+
timeoutMS int64 // timeout in milliseconds for queries, 0 means use cluster default
232236
logger *zap.Logger
233237
}
234238

0 commit comments

Comments
 (0)