Skip to content

Commit 4b544d8

Browse files
committed
internal/frontend: recycle database connections every 5m
In order to avoid imbalance between pkgsite's two database instances, recycle connections every 5 minutes. Change-Id: I9ca1e686a90f8c61619fd76454ec66163e501ee1 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/680175 kokoro-CI: kokoro <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent 041c7c0 commit 4b544d8

File tree

10 files changed

+309
-17
lines changed

10 files changed

+309
-17
lines changed

cmd/frontend/main.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"golang.org/x/pkgsite/internal/proxy"
3434
"golang.org/x/pkgsite/internal/queue"
3535
"golang.org/x/pkgsite/internal/queue/gcpqueue"
36+
"golang.org/x/pkgsite/internal/resource"
3637
"golang.org/x/pkgsite/internal/source"
3738
"golang.org/x/pkgsite/internal/static"
3839
"golang.org/x/pkgsite/internal/trace"
@@ -70,7 +71,7 @@ func main() {
7071
}
7172

7273
var (
73-
dsg func(context.Context) internal.DataSource
74+
dsg func(context.Context) (internal.DataSource, func())
7475
fetchQueue queue.Queue
7576
)
7677
if *bypassLicenseCheck {
@@ -96,14 +97,19 @@ func main() {
9697
ProxyClientForLatest: proxyClient,
9798
BypassLicenseCheck: *bypassLicenseCheck,
9899
}.New()
99-
dsg = func(context.Context) internal.DataSource { return ds }
100+
dsg = func(context.Context) (internal.DataSource, func()) { return ds, func() {} }
100101
} else {
101-
db, err := cmdconfig.OpenDB(ctx, cfg, *bypassLicenseCheck)
102-
if err != nil {
103-
log.Fatalf(ctx, "%v", err)
102+
dbResource := resource.New(func() *postgres.DB {
103+
db, err := cmdconfig.OpenDB(ctx, cfg, *bypassLicenseCheck)
104+
if err != nil {
105+
log.Fatalf(ctx, "%v", err)
106+
}
107+
return db
108+
}, 5*time.Minute)
109+
110+
dsg = func(ctx context.Context) (internal.DataSource, func()) {
111+
return dbResource.Get()
104112
}
105-
defer db.Close()
106-
dsg = func(context.Context) internal.DataSource { return db }
107113
sourceClient := source.NewClient(&http.Client{
108114
Transport: new(ochttp.Transport),
109115
Timeout: config.SourceTimeout,
@@ -113,6 +119,8 @@ func main() {
113119
// per-request connection.
114120
fetchQueue, err = gcpqueue.New(ctx, cfg, queueName, *workers, expg,
115121
func(ctx context.Context, modulePath, version string) (int, error) {
122+
db, release := dbResource.Get()
123+
defer release()
116124
return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db)
117125
})
118126
if err != nil {

cmd/internal/pkgsite/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ type ServerConfig struct {
4747
}
4848

4949
// BuildServer builds a *frontend.Server using the given configuration.
50+
//
51+
// TODO(rfindley): move to the cmd/pkgsite package, its only caller.
5052
func BuildServer(ctx context.Context, serverCfg ServerConfig) (*frontend.Server, error) {
5153
if len(serverCfg.Paths) == 0 && !serverCfg.UseCache && serverCfg.Proxy == nil {
5254
serverCfg.Paths = []string{"."}
@@ -288,7 +290,7 @@ func newServer(getters []fetch.ModuleGetter, localModules []frontend.LocalModule
288290
go lds.GetUnitMeta(context.Background(), "", "std", "latest")
289291

290292
server, err := frontend.NewServer(frontend.ServerConfig{
291-
DataSourceGetter: func(context.Context) internal.DataSource { return lds },
293+
DataSourceGetter: func(context.Context) (internal.DataSource, func()) { return lds, func() {} },
292294
TemplateFS: template.TrustedFSFromEmbed(static.FS),
293295
StaticFS: staticFS,
294296
DevMode: devMode,

internal/frontend/fetchserver/fetch_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func newTestServerWithFetch(t *testing.T, proxyModules []*proxytest.Module, cach
6262

6363
s, err := frontend.NewServer(frontend.ServerConfig{
6464
FetchServer: f,
65-
DataSourceGetter: func(context.Context) internal.DataSource { return testDB },
65+
DataSourceGetter: func(context.Context) (internal.DataSource, func()) { return testDB, func() {} },
6666
Queue: q,
6767
TemplateFS: template.TrustedFSFromEmbed(static.FS),
6868
// Use the embedded FSs here to make sure they're tested.

internal/frontend/frontend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func newTestServer(t *testing.T, cacher Cacher) (*Server, http.Handler) {
3939
t.Helper()
4040

4141
s, err := NewServer(ServerConfig{
42-
DataSourceGetter: func(context.Context) internal.DataSource { return fakedatasource.New() },
42+
DataSourceGetter: func(context.Context) (internal.DataSource, func()) { return fakedatasource.New(), func() {} },
4343
TemplateFS: template.TrustedFSFromEmbed(static.FS),
4444
// Use the embedded FSs here to make sure they're tested.
4545
// Integration tests will use the actual directories.

internal/frontend/latest_version.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ func (s *Server) GetLatestInfo(ctx context.Context, unitPath, modulePath string,
2626

2727
// It is okay to use a different DataSource (DB connection) than the rest of the
2828
// request, because this makes self-contained calls on the DB.
29-
ds := s.getDataSource(ctx)
29+
ds, release := s.getDataSource(ctx)
30+
defer release()
3031

3132
latest, err := ds.GetLatestInfo(ctx, unitPath, modulePath, latestUnitMeta)
3233
if err != nil {

internal/frontend/latest_version_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func TestLatestMinorVersion(t *testing.T) {
5959
}
6060
ctx := context.Background()
6161
insertTestModules(ctx, t, fds, persistedModules)
62-
svr := &Server{getDataSource: func(context.Context) internal.DataSource { return fds }}
62+
svr := &Server{getDataSource: func(context.Context) (internal.DataSource, func()) { return fds, func() {} }}
6363
for _, tc := range tt {
6464
t.Run(tc.name, func(t *testing.T) {
6565
got := svr.GetLatestInfo(ctx, tc.fullPath, tc.modulePath, nil)

internal/frontend/server.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import (
4242
type Server struct {
4343
fetchServer FetchServerInterface
4444
// getDataSource should never be called from a handler. It is called only in Server.errorHandler.
45-
getDataSource func(context.Context) internal.DataSource
45+
getDataSource func(context.Context) (internal.DataSource, func())
4646
queue queue.Queue
4747
templateFS template.TrustedFS
4848
staticFS fs.FS
@@ -82,9 +82,9 @@ type ServerConfig struct {
8282
Config *config.Config
8383
// Note that FetchServer may be nil.
8484
FetchServer FetchServerInterface
85-
// DataSourceGetter should return a DataSource on each call.
85+
// DataSourceGetter should return a DataSource and a release function on each call.
8686
// It should be goroutine-safe.
87-
DataSourceGetter func(context.Context) internal.DataSource
87+
DataSourceGetter func(context.Context) (internal.DataSource, func())
8888
Queue queue.Queue
8989
TemplateFS template.TrustedFS // for loading templates safely
9090
StaticFS fs.FS // for static/ directory
@@ -503,7 +503,8 @@ func (s *Server) PanicHandler() (_ http.HandlerFunc, err error) {
503503
func (s *Server) errorHandler(f func(w http.ResponseWriter, r *http.Request, ds internal.DataSource) error) http.HandlerFunc {
504504
return func(w http.ResponseWriter, r *http.Request) {
505505
// Obtain a DataSource to use for this request.
506-
ds := s.getDataSource(r.Context())
506+
ds, release := s.getDataSource(r.Context())
507+
defer release()
507508
if err := f(w, r, ds); err != nil {
508509
s.serveError(w, r, err)
509510
}

internal/resource/resource.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2025 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// The resource package defines types to track the lifecycle of transient
6+
// resources, such as a database connection, that should be renewed at some
7+
// fixed interval.
8+
package resource
9+
10+
import (
11+
"io"
12+
"sync"
13+
"sync/atomic"
14+
"time"
15+
)
16+
17+
type instance[T io.Closer] struct {
18+
refs atomic.Int64
19+
v T
20+
}
21+
22+
func (i *instance[T]) acquire(initial bool) (T, func()) {
23+
if i.refs.Add(1) <= 1 && !initial {
24+
panic("acquire on released instance")
25+
}
26+
return i.v, func() {
27+
if i.refs.Add(-1) == 0 {
28+
i.v.Close() // ignore error
29+
var zero T
30+
i.v = zero // aid GC
31+
}
32+
}
33+
}
34+
35+
// A Resource is a container for a transient resource of type T that should be
36+
// periodically renewed, such as a database connection.
37+
//
38+
// Its Get method returns an instance of the resource, along with a release
39+
// function that the caller must invoke when it is done with the resource.
40+
//
41+
// The first call to Get creates a new resource instance. This instance is
42+
// cached and returned by subsequent calls to Get for a fixed duration. After
43+
// this duration expires, the next call to Get will create a new instance. The
44+
// expired instance is closed once all its users have released it.
45+
//
46+
// A Resource is safe for concurrent use.
47+
type Resource[T io.Closer] struct {
48+
get func() T
49+
validFor time.Duration
50+
after func(func()) // for testing; fakes time.AfterFunc(validFor, f)
51+
52+
mu sync.Mutex
53+
cur *instance[T]
54+
}
55+
56+
// New creates a new Resource that is valid for the given duration. The get
57+
// function is called to create a new resource instance when one is needed.
58+
func New[T io.Closer](get func() T, validFor time.Duration) *Resource[T] {
59+
r := newAfter(get, func(f func()) {
60+
time.AfterFunc(validFor, f)
61+
})
62+
r.validFor = validFor
63+
return r
64+
}
65+
66+
// newAfter returns a new resource with a fake after func, for testing.
67+
func newAfter[T io.Closer](get func() T, after func(func())) *Resource[T] {
68+
return &Resource[T]{get: get, after: after}
69+
}
70+
71+
// Get returns the current resource instance and a function to release it.
72+
// The release function must be called when the caller is done with the
73+
// resource.
74+
func (r *Resource[T]) Get() (T, func()) {
75+
r.mu.Lock()
76+
defer r.mu.Unlock()
77+
if r.cur == nil {
78+
r.cur = &instance[T]{v: r.get()}
79+
// Acquire one ref for the new instance that lasts the duration.
80+
//
81+
// This is distinct from the ref acquired below; it ensures that the
82+
// resource is not released until the duration has expired.
83+
_, release := r.cur.acquire(true)
84+
r.after(func() {
85+
r.mu.Lock()
86+
defer r.mu.Unlock()
87+
release()
88+
r.cur = nil
89+
})
90+
}
91+
return r.cur.acquire(false)
92+
}

0 commit comments

Comments
 (0)