Skip to content

Commit 2987a5c

Browse files
authored
Merge pull request #421 from saydamir/connection_check_in_begin_tx
Check connection liveness in beginTx
2 parents a472d1b + c9f1faf commit 2987a5c

6 files changed

+204
-29
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Golang SQL database driver for [Yandex ClickHouse](https://clickhouse.yandex/)
2626
* pool_size - the maximum amount of preallocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa.
2727
* debug - enable debug output (boolean value)
2828
* compress - enable lz4 compression (integer value, default is '0')
29+
* check_connection_liveness - on supported platforms non-secure connections retrieved from the connection pool are checked in beginTx() for liveness before using them. If the check fails, the respective connection is marked as bad and the query retried with another connection. (boolean value, default is 'true')
2930

3031
SSL/TLS parameters:
3132

bootstrap.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,21 +83,22 @@ func open(dsn string) (*clickhouse, error) {
8383
return nil, err
8484
}
8585
var (
86-
hosts = []string{url.Host}
87-
query = url.Query()
88-
secure = false
89-
skipVerify = false
90-
tlsConfigName = query.Get("tls_config")
91-
noDelay = true
92-
compress = false
93-
database = query.Get("database")
94-
username = query.Get("username")
95-
password = query.Get("password")
96-
blockSize = 1000000
97-
connTimeout = DefaultConnTimeout
98-
readTimeout = DefaultReadTimeout
99-
writeTimeout = DefaultWriteTimeout
100-
connOpenStrategy = connOpenRandom
86+
hosts = []string{url.Host}
87+
query = url.Query()
88+
secure = false
89+
skipVerify = false
90+
tlsConfigName = query.Get("tls_config")
91+
noDelay = true
92+
compress = false
93+
database = query.Get("database")
94+
username = query.Get("username")
95+
password = query.Get("password")
96+
blockSize = 1000000
97+
connTimeout = DefaultConnTimeout
98+
readTimeout = DefaultReadTimeout
99+
writeTimeout = DefaultWriteTimeout
100+
connOpenStrategy = connOpenRandom
101+
checkConnLiveness = true
101102
)
102103
if len(database) == 0 {
103104
database = DefaultDatabase
@@ -156,12 +157,21 @@ func open(dsn string) (*clickhouse, error) {
156157
compress = v
157158
}
158159

160+
if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil {
161+
checkConnLiveness = v
162+
}
163+
if secure {
164+
// There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn
165+
checkConnLiveness = false
166+
}
167+
159168
var (
160169
ch = clickhouse{
161-
logf: func(string, ...interface{}) {},
162-
settings: settings,
163-
compress: compress,
164-
blockSize: blockSize,
170+
logf: func(string, ...interface{}) {},
171+
settings: settings,
172+
compress: compress,
173+
blockSize: blockSize,
174+
checkConnLiveness: checkConnLiveness,
165175
ServerInfo: data.ServerInfo{
166176
Timezone: time.Local,
167177
},

clickhouse.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,17 @@ type clickhouse struct {
4747
sync.Mutex
4848
data.ServerInfo
4949
data.ClientInfo
50-
logf logger
51-
conn *connect
52-
block *data.Block
53-
buffer *bufio.Writer
54-
decoder *binary.Decoder
55-
encoder *binary.Encoder
56-
settings *querySettings
57-
compress bool
58-
blockSize int
59-
inTransaction bool
50+
logf logger
51+
conn *connect
52+
block *data.Block
53+
buffer *bufio.Writer
54+
decoder *binary.Decoder
55+
encoder *binary.Encoder
56+
settings *querySettings
57+
compress bool
58+
blockSize int
59+
inTransaction bool
60+
checkConnLiveness bool
6061
}
6162

6263
func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {
@@ -124,6 +125,18 @@ func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse,
124125
case ch.conn.closed:
125126
return nil, driver.ErrBadConn
126127
}
128+
129+
// Perform a stale connection check. We only perform this check in beginTx,
130+
// because database/sql retries driver.ErrBadConn only for first request,
131+
// but beginTx doesn't perform any other network interaction.
132+
if ch.checkConnLiveness {
133+
if err := ch.conn.connCheck(); err != nil {
134+
ch.logf("[begin] closing bad idle connection: %w", err)
135+
ch.Close()
136+
return ch, driver.ErrBadConn
137+
}
138+
}
139+
127140
if finish := ch.watchCancel(ctx); finish != nil {
128141
defer finish()
129142
}

connect_check.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
2+
3+
package clickhouse
4+
5+
import (
6+
"errors"
7+
"fmt"
8+
"io"
9+
"syscall"
10+
"time"
11+
)
12+
13+
var errUnexpectedRead = errors.New("unexpected read from socket")
14+
15+
func (conn *connect) connCheck() error {
16+
var sysErr error
17+
18+
sysConn, ok := conn.Conn.(syscall.Conn)
19+
if !ok {
20+
return nil
21+
}
22+
rawConn, err := sysConn.SyscallConn()
23+
if err != nil {
24+
return err
25+
}
26+
// If this connection has a ReadTimeout which we've been setting on
27+
// reads, reset it to zero value before we attempt a non-blocking
28+
// read, otherwise we may get os.ErrDeadlineExceeded for the cached
29+
// connection from the pool with an expired timeout.
30+
if conn.readTimeout != 0 {
31+
err = conn.SetReadDeadline(time.Time{})
32+
if err != nil {
33+
return fmt.Errorf("set read deadline: %w", err)
34+
}
35+
conn.lastReadDeadlineTime = time.Time{}
36+
}
37+
err = rawConn.Read(func(fd uintptr) bool {
38+
var buf [1]byte
39+
n, err := syscall.Read(int(fd), buf[:])
40+
switch {
41+
case n == 0 && err == nil:
42+
sysErr = io.EOF
43+
case n > 0:
44+
sysErr = errUnexpectedRead
45+
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
46+
sysErr = nil
47+
default:
48+
sysErr = err
49+
}
50+
return true
51+
})
52+
if err != nil {
53+
return err
54+
}
55+
56+
return sysErr
57+
}

connect_check_dummy.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
2+
3+
package clickhouse
4+
5+
import "net"
6+
7+
func connCheck(conn net.Conn) error {
8+
return nil
9+
}

connect_check_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package clickhouse
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func Test_ConnCheck(t *testing.T) {
14+
const (
15+
ddl = `
16+
CREATE TABLE clickhouse_test_conncheck (
17+
Value String
18+
) Engine = Memory
19+
`
20+
dml = `
21+
INSERT INTO clickhouse_test_conncheck
22+
VALUES (?)
23+
`
24+
)
25+
26+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=false"); assert.NoError(t, err) {
27+
// We can only change the settings at the connection level.
28+
// If we have only one connection, we change the settings specifically for that connection.
29+
connect.SetMaxOpenConns(1)
30+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck"); assert.NoError(t, err) {
31+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
32+
_, err = connect.Exec("set idle_connection_timeout=1")
33+
assert.NoError(t, err)
34+
35+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
36+
assert.NoError(t, err)
37+
38+
time.Sleep(1100 * time.Millisecond)
39+
ctx := context.Background()
40+
tx, err := connect.BeginTx(ctx, nil)
41+
assert.NoError(t, err)
42+
43+
_, err = tx.PrepareContext(ctx, dml)
44+
assert.NoError(t, err)
45+
}
46+
}
47+
}
48+
}
49+
50+
func Test_ConnCheckNegative(t *testing.T) {
51+
const (
52+
ddl = `
53+
CREATE TABLE clickhouse_test_conncheck_negative (
54+
Value String
55+
) Engine = Memory
56+
`
57+
dml = `
58+
INSERT INTO clickhouse_test_conncheck_negative
59+
VALUES (?)
60+
`
61+
)
62+
63+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&check_connection_liveness=false"); assert.NoError(t, err) {
64+
// We can only change the settings at the connection level.
65+
// If we have only one connection, we change the settings specifically for that connection.
66+
connect.SetMaxOpenConns(1)
67+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck_negative"); assert.NoError(t, err) {
68+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
69+
_, err = connect.Exec("set idle_connection_timeout=1")
70+
assert.NoError(t, err)
71+
72+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
73+
assert.NoError(t, err)
74+
75+
time.Sleep(1100 * time.Millisecond)
76+
ctx := context.Background()
77+
tx, err := connect.BeginTx(ctx, nil)
78+
assert.NoError(t, err)
79+
80+
_, err = tx.PrepareContext(ctx, dml)
81+
assert.Equal(t, driver.ErrBadConn, err)
82+
}
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)