Skip to content

Commit 83558fe

Browse files
committed
chore: create shared lib for connections
1 parent bc33a3a commit 83558fe

File tree

10 files changed

+5135
-0
lines changed

10 files changed

+5135
-0
lines changed

spannerlib/backend/connection.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package backend
2+
3+
import (
4+
"database/sql"
5+
)
6+
7+
type SpannerConnection struct {
8+
Conn *sql.Conn
9+
}
10+
11+
func (conn *SpannerConnection) Close() error {
12+
return conn.Conn.Close()
13+
}

spannerlib/backend/db_pool.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"fmt"
8+
"sync"
9+
10+
spannerdriver "github.com/googleapis/go-sql-spanner"
11+
)
12+
13+
type Pool struct {
14+
Project string
15+
Instance string
16+
Database string
17+
18+
mu sync.Mutex
19+
entries map[string]*sql.DB
20+
}
21+
22+
func (pool *Pool) Close() (err error) {
23+
pool.mu.Lock()
24+
defer pool.mu.Unlock()
25+
for _, db := range pool.entries {
26+
err = errors.Join(err, db.Close())
27+
}
28+
return err
29+
}
30+
31+
func (pool *Pool) Conn(ctx context.Context, project, instance, database string) (*sql.Conn, error) {
32+
if project == "" {
33+
project = pool.Project
34+
}
35+
if instance == "" {
36+
instance = pool.Instance
37+
}
38+
if database == "" {
39+
database = pool.Database
40+
}
41+
key := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, database)
42+
pool.mu.Lock()
43+
defer pool.mu.Unlock()
44+
if db, ok := pool.entries[key]; ok {
45+
return db.Conn(ctx)
46+
}
47+
config := spannerdriver.ConnectorConfig{
48+
Project: project,
49+
Instance: instance,
50+
Database: database,
51+
}
52+
connector, err := spannerdriver.CreateConnector(config)
53+
if err != nil {
54+
return nil, err
55+
}
56+
db := sql.OpenDB(connector)
57+
if pool.entries == nil {
58+
pool.entries = make(map[string]*sql.DB)
59+
}
60+
pool.entries[key] = db
61+
return db.Conn(ctx)
62+
}

spannerlib/exported/connection.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package exported
2+
3+
import "C"
4+
import (
5+
"context"
6+
"database/sql"
7+
"fmt"
8+
"sync"
9+
"sync/atomic"
10+
11+
"cloud.google.com/go/spanner"
12+
"cloud.google.com/go/spanner/apiv1/spannerpb"
13+
spannerdriver "github.com/googleapis/go-sql-spanner"
14+
"google.golang.org/protobuf/proto"
15+
"spannerlib/backend"
16+
)
17+
18+
func CloseConnection(poolId, connId int64) *Message {
19+
conn, err := findConnection(poolId, connId)
20+
if err != nil {
21+
return errMessage(err)
22+
}
23+
return conn.close()
24+
}
25+
26+
func Execute(poolId, connId int64, statementBytes []byte) *Message {
27+
statement := spannerpb.ExecuteBatchDmlRequest_Statement{}
28+
if err := proto.Unmarshal(statementBytes, &statement); err != nil {
29+
return errMessage(err)
30+
}
31+
fmt.Printf("Statement: %v\n", statement.Sql)
32+
fmt.Printf("Params: %v\n", statement.Params)
33+
conn, err := findConnection(poolId, connId)
34+
if err != nil {
35+
return errMessage(err)
36+
}
37+
return conn.Execute(&statement)
38+
}
39+
40+
type Connection struct {
41+
results *sync.Map
42+
resultsIdx atomic.Int64
43+
44+
backend *backend.SpannerConnection
45+
}
46+
47+
func (conn *Connection) close() *Message {
48+
conn.results.Range(func(key, value interface{}) bool {
49+
res := value.(*rows)
50+
res.Close()
51+
return true
52+
})
53+
err := conn.backend.Close()
54+
if err != nil {
55+
return errMessage(err)
56+
}
57+
return &Message{}
58+
}
59+
60+
func (conn *Connection) Execute(statement *spannerpb.ExecuteBatchDmlRequest_Statement) *Message {
61+
paramsLen := 1
62+
if statement.Params != nil {
63+
paramsLen = 1 + len(statement.Params.Fields)
64+
}
65+
params := make([]any, paramsLen)
66+
params = append(params, spannerdriver.ExecOptions{DecodeOption: spannerdriver.DecodeOptionProto})
67+
if statement.Params != nil {
68+
if statement.ParamTypes == nil {
69+
statement.ParamTypes = make(map[string]*spannerpb.Type)
70+
}
71+
for param, value := range statement.Params.Fields {
72+
genericValue := spanner.GenericColumnValue{
73+
Value: value,
74+
Type: statement.ParamTypes[param],
75+
}
76+
params = append(params, sql.Named(param, genericValue))
77+
}
78+
}
79+
it, err := conn.backend.Conn.QueryContext(context.Background(), statement.Sql, params...)
80+
if err != nil {
81+
return errMessage(err)
82+
}
83+
id := conn.resultsIdx.Add(1)
84+
res := &rows{
85+
backend: it,
86+
}
87+
conn.results.Store(id, res)
88+
return idMessage(id)
89+
}

spannerlib/exported/message.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package exported
2+
3+
import (
4+
"unsafe"
5+
6+
"github.com/google/uuid"
7+
"google.golang.org/grpc/status"
8+
)
9+
10+
type Message struct {
11+
Code int32
12+
ObjectId int64
13+
Res []byte
14+
}
15+
16+
func (m *Message) Length() int32 {
17+
if m.Res == nil {
18+
return 0
19+
}
20+
return int32(len(m.Res))
21+
}
22+
23+
func (m *Message) ResPointer() unsafe.Pointer {
24+
if m.Res == nil {
25+
return nil
26+
}
27+
return unsafe.Pointer(&(m.Res[0]))
28+
}
29+
30+
func generateId() (string, error) {
31+
id, err := uuid.NewRandom()
32+
if err != nil {
33+
return "", err
34+
}
35+
return id.String(), nil
36+
}
37+
38+
type BaseMsg struct {
39+
Allowed bool
40+
}
41+
42+
type OtherMsg struct {
43+
BaseMsg
44+
Foo string
45+
}
46+
47+
func idMessage(id int64) *Message {
48+
return &Message{ObjectId: id}
49+
}
50+
51+
func errMessage(err error) *Message {
52+
errCode := status.Code(err)
53+
b := []byte(err.Error())
54+
return &Message{Code: int32(errCode), Res: b}
55+
}

spannerlib/exported/pool.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package exported
2+
3+
import "C"
4+
import (
5+
"context"
6+
"fmt"
7+
"sync"
8+
"sync/atomic"
9+
10+
"spannerlib/backend"
11+
)
12+
13+
var pools = sync.Map{}
14+
var poolsIdx = atomic.Int64{}
15+
16+
type Pool struct {
17+
backend *backend.Pool
18+
connections *sync.Map
19+
connectionsIdx atomic.Int64
20+
}
21+
22+
func CreatePool() *Message {
23+
backendPool := &backend.Pool{}
24+
id := poolsIdx.Add(1)
25+
pool := &Pool{
26+
backend: backendPool,
27+
connections: &sync.Map{},
28+
}
29+
pools.Store(id, pool)
30+
return idMessage(id)
31+
}
32+
33+
func ClosePool(id int64) *Message {
34+
p, ok := pools.LoadAndDelete(id)
35+
if !ok {
36+
return errMessage(fmt.Errorf("pool %v not found", id))
37+
}
38+
pool := p.(*Pool)
39+
pool.connections.Range(func(key, value interface{}) bool {
40+
conn := value.(*Connection)
41+
conn.close()
42+
return true
43+
})
44+
return &Message{}
45+
}
46+
47+
func CreateConnection(poolId int64, project, instance, database string) *Message {
48+
p, ok := pools.Load(poolId)
49+
if !ok {
50+
return errMessage(fmt.Errorf("pool %v not found", poolId))
51+
}
52+
pool := p.(*Pool)
53+
sqlConn, err := pool.backend.Conn(context.Background(), project, instance, database)
54+
if err != nil {
55+
return errMessage(err)
56+
}
57+
id := poolsIdx.Add(1)
58+
conn := &Connection{
59+
backend: &backend.SpannerConnection{Conn: sqlConn},
60+
results: &sync.Map{},
61+
}
62+
pool.connections.Store(id, conn)
63+
64+
return idMessage(id)
65+
}
66+
67+
func findConnection(poolId, connId int64) (*Connection, error) {
68+
p, ok := pools.Load(poolId)
69+
if !ok {
70+
return nil, fmt.Errorf("pool %v not found", poolId)
71+
}
72+
pool := p.(*Pool)
73+
c, ok := pool.connections.Load(connId)
74+
if !ok {
75+
return nil, fmt.Errorf("connection %v not found", connId)
76+
}
77+
conn := c.(*Connection)
78+
return conn, nil
79+
}
80+
81+
func findRows(poolId, connId, rowsId int64) (*rows, error) {
82+
conn, err := findConnection(poolId, connId)
83+
if err != nil {
84+
return nil, err
85+
}
86+
r, ok := conn.results.Load(rowsId)
87+
if !ok {
88+
return nil, fmt.Errorf("rows %v not found", rowsId)
89+
}
90+
res := r.(*rows)
91+
return res, nil
92+
}

spannerlib/exported/pool_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package exported
2+
3+
import (
4+
"cloud.google.com/go/spanner/apiv1/spannerpb"
5+
"fmt"
6+
"google.golang.org/protobuf/proto"
7+
"google.golang.org/protobuf/types/known/structpb"
8+
"testing"
9+
)
10+
11+
func TestExecute(t *testing.T) {
12+
pool := CreatePool()
13+
conn := CreateConnection(pool.ObjectId, "appdev-soda-spanner-staging", "knut-test-ycsb", "knut-test-db")
14+
stmt := spannerpb.ExecuteBatchDmlRequest_Statement{
15+
Sql: "select * from all_types where col_varchar=$1 /*and col_bigint=@id*/ limit 10",
16+
Params: &structpb.Struct{
17+
Fields: map[string]*structpb.Value{"p1": {Kind: &structpb.Value_StringValue{StringValue: "61763b0e7feb3ea8fc9e734a6700f6a4"}}},
18+
},
19+
}
20+
stmtBytes, _ := proto.Marshal(&stmt)
21+
rows := Execute(pool.ObjectId, conn.ObjectId, stmtBytes)
22+
for {
23+
row := Next(pool.ObjectId, conn.ObjectId, rows.ObjectId)
24+
rowValue := structpb.ListValue{}
25+
_ = proto.Unmarshal(row.Res, &rowValue)
26+
if row.Length() == 0 {
27+
break
28+
}
29+
fmt.Printf("row: %v\n", rowValue.Values)
30+
}
31+
CloseRows(pool.ObjectId, conn.ObjectId, rows.ObjectId)
32+
CloseConnection(pool.ObjectId, conn.ObjectId)
33+
ClosePool(pool.ObjectId)
34+
}

0 commit comments

Comments
 (0)