Skip to content

Rewrite queue #24505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"code.gitea.io/gitea/modules/private"
repo_module "code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"

"github.com/urfave/cli"
)
Expand Down Expand Up @@ -141,7 +140,7 @@ func (d *delayWriter) Close() error {
if d == nil {
return nil
}
stopped := util.StopTimer(d.timer)
stopped := d.timer.Stop()
if stopped || d.buf == nil {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions models/unittest/testdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func MainTest(m *testing.M, testOpts *TestOptions) {
}

if err = CreateTestEngine(opts); err != nil {
_, _ = fmt.Fprintln(os.Stderr, `sqlite3 requires: import _ "github.com/mattn/go-sqlite3" or -tags sqlite,sqlite_unlock_notify`)
fatalTestError("Error creating test engine: %v\n", err)
}

Expand Down
4 changes: 0 additions & 4 deletions modules/indexer/code/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,6 @@ func (b *BleveIndexer) Close() {
log.Info("PID: %d Repository Indexer closed", os.Getpid())
}

// SetAvailabilityChangeCallback does nothing
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}

// Ping does nothing
func (b *BleveIndexer) Ping() bool {
return true
Expand Down
22 changes: 5 additions & 17 deletions modules/indexer/code/elastic_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ var _ Indexer = &ElasticSearchIndexer{}

// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
client *elastic.Client
indexerAliasName string
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
lock sync.RWMutex
client *elastic.Client
indexerAliasName string
available bool
stopTimer chan struct{}
lock sync.RWMutex
}

type elasticLogger struct {
Expand Down Expand Up @@ -198,13 +197,6 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
return exists, nil
}

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
b.lock.Lock()
defer b.lock.Unlock()
b.availabilityCallback = callback
}

// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
Expand Down Expand Up @@ -529,8 +521,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
}

b.available = available
if b.availabilityCallback != nil {
// Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}
56 changes: 26 additions & 30 deletions modules/indexer/code/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type SearchResultLanguages struct {
// Indexer defines an interface to index and search code contents
type Indexer interface {
Ping() bool
SetAvailabilityChangeCallback(callback func(bool))
Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
Expand Down Expand Up @@ -81,7 +80,7 @@ type IndexerData struct {
RepoID int64
}

var indexerQueue queue.UniqueQueue
var indexerQueue *queue.WorkerPoolQueue[*IndexerData]

func index(ctx context.Context, indexer Indexer, repoID int64) error {
repo, err := repo_model.GetRepositoryByID(ctx, repoID)
Expand Down Expand Up @@ -137,37 +136,46 @@ func Init() {
// Create the Queue
switch setting.Indexer.RepoType {
case "bleve", "elasticsearch":
handler := func(data ...queue.Data) []queue.Data {
handler := func(items ...*IndexerData) []*IndexerData {
idx, err := indexer.get()
if idx == nil || err != nil {
log.Error("Codes indexer handler: unable to get indexer!")
return data
return items
}
if !idx.Ping() {
log.Error("Code indexer handler: indexer is unavailable.")
return items
}

unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
continue
}
// the old logic did: if indexer.Ping() { return nil }, skip all failed items

for _, indexerData := range items {
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)

// FIXME: it seems there is a bug in `CatFileBatch` or `nio.Pipe`, which will cause the process to hang forever in rare cases
/*
sync.(*Cond).Wait(cond.go:70)
github.com/djherbis/nio/v3.(*PipeReader).Read(sync.go:106)
bufio.(*Reader).fill(bufio.go:106)
bufio.(*Reader).ReadSlice(bufio.go:372)
bufio.(*Reader).collectFragments(bufio.go:447)
bufio.(*Reader).ReadString(bufio.go:494)
code.gitea.io/gitea/modules/git.ReadBatchLine(batch_reader.go:149)
code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).addUpdate(bleve.go:214)
code.gitea.io/gitea/modules/indexer/code.(*BleveIndexer).Index(bleve.go:296)
code.gitea.io/gitea/modules/indexer/code.(*wrappedIndexer).Index(wrapped.go:74)
code.gitea.io/gitea/modules/indexer/code.index(indexer.go:105)
*/
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
if !setting.IsInTesting {
log.Error("indexer index error for repo %v: %v", indexerData.RepoID, err)
}
if indexer.Ping() {
continue
}
// Add back to queue
unhandled = append(unhandled, datum)
}
}
return unhandled
return nil
}

indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
if indexerQueue == nil {
log.Fatal("Unable to create codes indexer queue")
}
Expand Down Expand Up @@ -224,18 +232,6 @@ func Init() {

indexer.set(rIndexer)

if queue, ok := indexerQueue.(queue.Pausable); ok {
rIndexer.SetAvailabilityChangeCallback(func(available bool) {
if !available {
log.Info("Code index queue paused")
queue.Pause()
} else {
log.Info("Code index queue resumed")
queue.Resume()
}
})
}

// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)

Expand Down
10 changes: 0 additions & 10 deletions modules/indexer/code/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,6 @@ func (w *wrappedIndexer) get() (Indexer, error) {
return w.internal, nil
}

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
indexer, err := w.get()
if err != nil {
log.Error("Failed to get indexer: %v", err)
return
}
indexer.SetAvailabilityChangeCallback(callback)
}

// Ping checks if elastic is available
func (w *wrappedIndexer) Ping() bool {
indexer, err := w.get()
Expand Down
4 changes: 0 additions & 4 deletions modules/indexer/issues/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,6 @@ func (b *BleveIndexer) Init() (bool, error) {
return false, err
}

// SetAvailabilityChangeCallback does nothing
func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}

// Ping does nothing
func (b *BleveIndexer) Ping() bool {
return true
Expand Down
4 changes: 0 additions & 4 deletions modules/indexer/issues/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ func (i *DBIndexer) Init() (bool, error) {
return false, nil
}

// SetAvailabilityChangeCallback dummy function
func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
}

// Ping checks if database is available
func (i *DBIndexer) Ping() bool {
return db.GetEngine(db.DefaultContext).Ping() != nil
Expand Down
22 changes: 5 additions & 17 deletions modules/indexer/issues/elastic_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ var _ Indexer = &ElasticSearchIndexer{}

// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
client *elastic.Client
indexerName string
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
lock sync.RWMutex
client *elastic.Client
indexerName string
available bool
stopTimer chan struct{}
lock sync.RWMutex
}

type elasticLogger struct {
Expand Down Expand Up @@ -138,13 +137,6 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
return true, nil
}

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
b.lock.Lock()
defer b.lock.Unlock()
b.availabilityCallback = callback
}

// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
Expand Down Expand Up @@ -305,8 +297,4 @@ func (b *ElasticSearchIndexer) setAvailability(available bool) {
}

b.available = available
if b.availabilityCallback != nil {
// Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}
69 changes: 17 additions & 52 deletions modules/indexer/issues/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type SearchResult struct {
type Indexer interface {
Init() (bool, error)
Ping() bool
SetAvailabilityChangeCallback(callback func(bool))
Index(issue []*IndexerData) error
Delete(ids ...int64) error
Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
Expand Down Expand Up @@ -94,7 +93,7 @@ func (h *indexerHolder) get() Indexer {

var (
// issueIndexerQueue queue of issue ids to be updated
issueIndexerQueue queue.Queue
issueIndexerQueue *queue.WorkerPoolQueue[*IndexerData]
holder = newIndexerHolder()
)

Expand All @@ -108,62 +107,43 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
case "bleve", "elasticsearch", "meilisearch":
handler := func(data ...queue.Data) []queue.Data {
handler := func(items ...*IndexerData) []*IndexerData {
indexer := holder.get()
if indexer == nil {
log.Error("Issue indexer handler: unable to get indexer!")
return data
log.Error("Issue indexer handler: unable to get indexer.")
return items
}
if !indexer.Ping() {
log.Error("Issue indexer handler: indexer is unavailable.")
return items
}

iData := make([]*IndexerData, 0, len(data))
unhandled := make([]queue.Data, 0, len(data))
for _, datum := range data {
indexerData, ok := datum.(*IndexerData)
if !ok {
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
continue
}
// the old logic did: if indexer.Ping() { return nil }, skip all failed items

toIndex := make([]*IndexerData, 0, len(items))
for _, indexerData := range items {
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
if indexerData.IsDelete {
if err := indexer.Delete(indexerData.IDs...); err != nil {
log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err)
if indexer.Ping() {
continue
}
// Add back to queue
unhandled = append(unhandled, datum)
}
continue
}
iData = append(iData, indexerData)
toIndex = append(toIndex, indexerData)
}
if len(unhandled) > 0 {
for _, indexerData := range iData {
unhandled = append(unhandled, indexerData)
}
return unhandled
}
if err := indexer.Index(iData); err != nil {
log.Error("Error whilst indexing: %v Error: %v", iData, err)
if indexer.Ping() {
return nil
}
// Add back to queue
for _, indexerData := range iData {
unhandled = append(unhandled, indexerData)
}
return unhandled
if err := indexer.Index(toIndex); err != nil {
log.Error("Error whilst indexing: %v Error: %v", toIndex, err)
}
return nil
}

issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)

if issueIndexerQueue == nil {
log.Fatal("Unable to create issue indexer queue")
}
default:
issueIndexerQueue = &queue.DummyQueue{}
issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
}

// Create the Indexer
Expand Down Expand Up @@ -240,18 +220,6 @@ func InitIssueIndexer(syncReindex bool) {
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}

if queue, ok := issueIndexerQueue.(queue.Pausable); ok {
holder.get().SetAvailabilityChangeCallback(func(available bool) {
if !available {
log.Info("Issue index queue paused")
queue.Pause()
} else {
log.Info("Issue index queue resumed")
queue.Resume()
}
})
}

// Start processing the queue
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)

Expand Down Expand Up @@ -285,9 +253,6 @@ func InitIssueIndexer(syncReindex bool) {
case <-graceful.GetManager().IsShutdown():
log.Warn("Shutdown occurred before issue index initialisation was complete")
case <-time.After(timeout):
if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
shutdownable.Terminate()
}
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
}
}()
Expand Down
Loading