Skip to content

Commit 95bca65

Browse files
committed
fix
1 parent 033d929 commit 95bca65

File tree

30 files changed

+182
-249
lines changed

30 files changed

+182
-249
lines changed

modules/graceful/context.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,51 +5,8 @@ package graceful
55

66
import (
77
"context"
8-
"time"
98
)
109

11-
// ChannelContext is a context that wraps a channel and error as a context
12-
type ChannelContext struct {
13-
done <-chan struct{}
14-
err error
15-
}
16-
17-
// NewChannelContext creates a ChannelContext from a channel and error
18-
func NewChannelContext(done <-chan struct{}, err error) *ChannelContext {
19-
return &ChannelContext{
20-
done: done,
21-
err: err,
22-
}
23-
}
24-
25-
// Deadline returns the time when work done on behalf of this context
26-
// should be canceled. There is no Deadline for a ChannelContext
27-
func (ctx *ChannelContext) Deadline() (deadline time.Time, ok bool) {
28-
return deadline, ok
29-
}
30-
31-
// Done returns the channel provided at the creation of this context.
32-
// When closed, work done on behalf of this context should be canceled.
33-
func (ctx *ChannelContext) Done() <-chan struct{} {
34-
return ctx.done
35-
}
36-
37-
// Err returns nil, if Done is not closed. If Done is closed,
38-
// Err returns the error provided at the creation of this context
39-
func (ctx *ChannelContext) Err() error {
40-
select {
41-
case <-ctx.done:
42-
return ctx.err
43-
default:
44-
return nil
45-
}
46-
}
47-
48-
// Value returns nil for all calls as no values are or can be associated with this context
49-
func (ctx *ChannelContext) Value(key interface{}) interface{} {
50-
return nil
51-
}
52-
5310
// ShutdownContext returns a context.Context that is Done at shutdown
5411
// Callers using this context should ensure that they are registered as a running server
5512
// in order that they are waited for.

modules/graceful/manager.go

Lines changed: 10 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ const (
2323
stateTerminate
2424
)
2525

26+
type RunCanceler interface {
27+
Run()
28+
Cancel()
29+
}
30+
2631
// There are some places that could inherit sockets:
2732
//
2833
// * HTTP or HTTPS main listener
@@ -55,46 +60,18 @@ func InitManager(ctx context.Context) {
5560
})
5661
}
5762

58-
// WithCallback is a runnable to call when the caller has finished
59-
type WithCallback func(callback func())
60-
61-
// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
62-
// After the callback to atShutdown is called and is complete, the main function must return.
63-
// Similarly the callback function provided to atTerminate must return once termination is complete.
64-
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
65-
// - users must therefore be careful to only call these as necessary.
66-
type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
67-
68-
// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
69-
// After the callback to atShutdown is called and is complete, the main function must return.
70-
// Similarly the callback function provided to atTerminate must return once termination is complete.
71-
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
72-
// - users must therefore be careful to only call these as necessary.
73-
func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
63+
// RunWithCancel helps to run a function with a custom context, the cancel function will be called at shutdown
64+
func (g *Manager) RunWithCancel(rc RunCanceler) {
65+
g.RunAtShutdown(context.Background(), rc.Cancel)
7466
g.runningServerWaitGroup.Add(1)
7567
defer g.runningServerWaitGroup.Done()
7668
defer func() {
7769
if err := recover(); err != nil {
78-
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
70+
log.Critical("PANIC during RunWithCustomContext: %v\nStacktrace: %s", err, log.Stack(2))
7971
g.doShutdown()
8072
}
8173
}()
82-
run(func(atShutdown func()) {
83-
g.lock.Lock()
84-
defer g.lock.Unlock()
85-
g.toRunAtShutdown = append(g.toRunAtShutdown,
86-
func() {
87-
defer func() {
88-
if err := recover(); err != nil {
89-
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
90-
g.doShutdown()
91-
}
92-
}()
93-
atShutdown()
94-
})
95-
}, func(atTerminate func()) {
96-
g.RunAtTerminate(atTerminate)
97-
})
74+
rc.Run()
9875
}
9976

10077
// RunWithShutdownContext takes a function that has a context to watch for shutdown.
@@ -151,21 +128,6 @@ func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
151128
})
152129
}
153130

154-
// RunAtHammer creates a go-routine to run the provided function at shutdown
155-
func (g *Manager) RunAtHammer(hammer func()) {
156-
g.lock.Lock()
157-
defer g.lock.Unlock()
158-
g.toRunAtHammer = append(g.toRunAtHammer,
159-
func() {
160-
defer func() {
161-
if err := recover(); err != nil {
162-
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
163-
}
164-
}()
165-
hammer()
166-
})
167-
}
168-
169131
func (g *Manager) doShutdown() {
170132
if !g.setStateTransition(stateRunning, stateShuttingDown) {
171133
g.DoImmediateHammer()
@@ -206,9 +168,6 @@ func (g *Manager) doHammerTime(d time.Duration) {
206168
g.hammerCtxCancel()
207169
atHammerCtx := pprof.WithLabels(g.terminateCtx, pprof.Labels("graceful-lifecycle", "post-hammer"))
208170
pprof.SetGoroutineLabels(atHammerCtx)
209-
for _, fn := range g.toRunAtHammer {
210-
go fn()
211-
}
212171
}
213172
g.lock.Unlock()
214173
}

modules/graceful/manager_unix.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ type Manager struct {
4141
terminateWaitGroup sync.WaitGroup
4242

4343
toRunAtShutdown []func()
44-
toRunAtHammer []func()
4544
toRunAtTerminate []func()
4645
}
4746

modules/graceful/manager_windows.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ type Manager struct {
5050
shutdownRequested chan struct{}
5151

5252
toRunAtShutdown []func()
53-
toRunAtHammer []func()
5453
toRunAtTerminate []func()
5554
}
5655

modules/indexer/code/indexer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func Init() {
166166
handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
167167
idx, err := indexer.get()
168168
if idx == nil || err != nil {
169-
log.Error("Codes indexer handler: unable to get indexer!")
169+
log.Warn("Codes indexer handler: indexer is not ready, retry later.")
170170
return items
171171
}
172172

@@ -201,7 +201,7 @@ func Init() {
201201
return unhandled
202202
}
203203

204-
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
204+
indexerQueue = queue.CreateUniqueQueue(ctx, "code_indexer", handler)
205205
if indexerQueue == nil {
206206
log.Fatal("Unable to create codes indexer queue")
207207
}
@@ -259,7 +259,7 @@ func Init() {
259259
indexer.set(rIndexer)
260260

261261
// Start processing the queue
262-
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
262+
go graceful.GetManager().RunWithCancel(indexerQueue)
263263

264264
if populate {
265265
go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)

modules/indexer/issues/indexer.go

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,15 @@ var (
102102
func InitIssueIndexer(syncReindex bool) {
103103
ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
104104

105-
waitChannel := make(chan time.Duration, 1)
105+
indexerInitWaitChannel := make(chan time.Duration, 1)
106106

107107
// Create the Queue
108108
switch setting.Indexer.IssueType {
109109
case "bleve", "elasticsearch", "meilisearch":
110110
handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
111111
indexer := holder.get()
112112
if indexer == nil {
113-
log.Error("Issue indexer handler: unable to get indexer.")
113+
log.Warn("Issue indexer handler: indexer is not ready, retry later.")
114114
return items
115115
}
116116
toIndex := make([]*IndexerData, 0, len(items))
@@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) {
138138
return unhandled
139139
}
140140

141-
issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)
141+
issueIndexerQueue = queue.CreateSimpleQueue(ctx, "issue_indexer", handler)
142142

143143
if issueIndexerQueue == nil {
144144
log.Fatal("Unable to create issue indexer queue")
145145
}
146146
default:
147-
issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
147+
issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil)
148148
}
149149

150+
graceful.GetManager().RunAtTerminate(finished)
151+
150152
// Create the Indexer
151153
go func() {
152154
pprof.SetGoroutineLabels(ctx)
@@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) {
178180
if issueIndexer != nil {
179181
issueIndexer.Close()
180182
}
181-
finished()
182183
log.Info("PID: %d Issue Indexer closed", os.Getpid())
183184
})
184185
log.Debug("Created Bleve Indexer")
185186
case "elasticsearch":
186-
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
187-
pprof.SetGoroutineLabels(ctx)
188-
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
189-
if err != nil {
190-
log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
191-
}
192-
exist, err := issueIndexer.Init()
193-
if err != nil {
194-
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
195-
}
196-
populate = !exist
197-
holder.set(issueIndexer)
198-
atTerminate(finished)
199-
})
187+
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
188+
if err != nil {
189+
log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
190+
}
191+
exist, err := issueIndexer.Init()
192+
if err != nil {
193+
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
194+
}
195+
populate = !exist
196+
holder.set(issueIndexer)
200197
case "db":
201198
issueIndexer := &DBIndexer{}
202199
holder.set(issueIndexer)
203-
graceful.GetManager().RunAtTerminate(finished)
204200
case "meilisearch":
205-
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
206-
pprof.SetGoroutineLabels(ctx)
207-
issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
208-
if err != nil {
209-
log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
210-
}
211-
exist, err := issueIndexer.Init()
212-
if err != nil {
213-
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
214-
}
215-
populate = !exist
216-
holder.set(issueIndexer)
217-
atTerminate(finished)
218-
})
201+
issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
202+
if err != nil {
203+
log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
204+
}
205+
exist, err := issueIndexer.Init()
206+
if err != nil {
207+
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
208+
}
209+
populate = !exist
210+
holder.set(issueIndexer)
219211
default:
220212
holder.cancel()
221213
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
222214
}
223215

224216
// Start processing the queue
225-
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
217+
go graceful.GetManager().RunWithCancel(issueIndexerQueue)
226218

227219
// Populate the index
228220
if populate {
@@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) {
232224
go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
233225
}
234226
}
235-
waitChannel <- time.Since(start)
236-
close(waitChannel)
227+
228+
indexerInitWaitChannel <- time.Since(start)
229+
close(indexerInitWaitChannel)
237230
}()
238231

239232
if syncReindex {
240233
select {
241-
case <-waitChannel:
234+
case <-indexerInitWaitChannel:
242235
case <-graceful.GetManager().IsShutdown():
243236
}
244237
} else if setting.Indexer.StartupTimeout > 0 {
@@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) {
249242
timeout += setting.GracefulHammerTime
250243
}
251244
select {
252-
case duration := <-waitChannel:
245+
case duration := <-indexerInitWaitChannel:
253246
log.Info("Issue Indexer Initialization took %v", duration)
254247
case <-graceful.GetManager().IsShutdown():
255248
log.Warn("Shutdown occurred before issue index initialisation was complete")

modules/indexer/stats/queue.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@ func handler(items ...int64) []int64 {
2929
}
3030

3131
func initStatsQueue() error {
32-
statsQueue = queue.CreateUniqueQueue("repo_stats_update", handler)
32+
statsQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo_stats_update", handler)
3333
if statsQueue == nil {
34-
return fmt.Errorf("Unable to create repo_stats_update Queue")
34+
return fmt.Errorf("unable to create repo_stats_update queue")
3535
}
36-
37-
go graceful.GetManager().RunWithShutdownFns(statsQueue.Run)
38-
36+
go graceful.GetManager().RunWithCancel(statsQueue)
3937
return nil
4038
}
4139

modules/log/event_writer_base.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"regexp"
11+
"runtime/pprof"
1112
"time"
1213
)
1314

@@ -57,6 +58,8 @@ func (b *EventWriterBaseImpl) GetLevel() Level {
5758

5859
// Run is the default implementation for EventWriter.Run
5960
func (b *EventWriterBaseImpl) Run(ctx context.Context) {
61+
pprof.SetGoroutineLabels(ctx)
62+
6063
defer b.OutputWriteCloser.Close()
6164

6265
var exprRegexp *regexp.Regexp
@@ -143,9 +146,16 @@ func eventWriterStartGo(ctx context.Context, w EventWriter, shared bool) {
143146
}
144147
w.Base().shared = shared
145148
w.Base().stopped = make(chan struct{})
149+
150+
ctxDesc := "Logger: EventWriter: " + w.GetWriterName()
151+
if shared {
152+
ctxDesc = "Logger: EventWriter (shared): " + w.GetWriterName()
153+
}
154+
writerCtx, writerCancel := newContext(ctx, ctxDesc)
146155
go func() {
156+
defer writerCancel()
147157
defer close(w.Base().stopped)
148-
w.Run(ctx)
158+
w.Run(writerCtx)
149159
}()
150160
}
151161

modules/log/event_writer_conn_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestConnLogger(t *testing.T) {
4040
level := INFO
4141
flags := LstdFlags | LUTC | Lfuncname
4242

43-
logger := NewLoggerWithWriters(context.Background(), NewEventWriterConn("test-conn", WriterMode{
43+
logger := NewLoggerWithWriters(context.Background(), "test", NewEventWriterConn("test-conn", WriterMode{
4444
Level: level,
4545
Prefix: prefix,
4646
Flags: FlagsFromBits(flags),

0 commit comments

Comments
 (0)