1
0
Fork 0
forked from forgejo/forgejo

Issue search support elasticsearch (#9428)

* Issue search support elasticsearch

* Fix lint

* Add indexer name on app.ini

* add a warnning on SearchIssuesByKeyword

* improve code
This commit is contained in:
Lunny Xiao 2020-02-13 14:06:17 +08:00 committed by GitHub
parent 17656021f1
commit 5dbf36f356
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
286 changed files with 57032 additions and 25 deletions

View file

@ -170,7 +170,7 @@ func NewBleveIndexer(indexDir string) *BleveIndexer {
}
}
// Init will initial the indexer
// Init will initialize the indexer
func (b *BleveIndexer) Init() (bool, error) {
var err error
b.indexer, err = openIndexer(b.indexDir, issueIndexerLatestVersion)

View file

@ -0,0 +1,230 @@
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package issues
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
)
var (
_ Indexer = &ElasticSearchIndexer{}
)
// ElasticSearchIndexer implements Indexer interface
type ElasticSearchIndexer struct {
client *elastic.Client
indexerName string
}
type elasticLogger struct {
*log.Logger
}
func (l elasticLogger) Printf(format string, args ...interface{}) {
_ = l.Logger.Log(2, l.Logger.GetLevel(), format, args...)
}
// NewElasticSearchIndexer creates a new elasticsearch indexer
func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, error) {
opts := []elastic.ClientOptionFunc{
elastic.SetURL(url),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10 * time.Second),
elastic.SetGzip(false),
}
logger := elasticLogger{log.GetLogger(log.DEFAULT)}
if logger.GetLevel() == log.TRACE || logger.GetLevel() == log.DEBUG {
opts = append(opts, elastic.SetTraceLog(logger))
} else if logger.GetLevel() == log.ERROR || logger.GetLevel() == log.CRITICAL || logger.GetLevel() == log.FATAL {
opts = append(opts, elastic.SetErrorLog(logger))
} else if logger.GetLevel() == log.INFO || logger.GetLevel() == log.WARN {
opts = append(opts, elastic.SetInfoLog(logger))
}
client, err := elastic.NewClient(opts...)
if err != nil {
return nil, err
}
return &ElasticSearchIndexer{
client: client,
indexerName: indexerName,
}, nil
}
const (
defaultMapping = `{
"mappings": {
"properties": {
"id": {
"type": "integer",
"index": true
},
"repo_id": {
"type": "integer",
"index": true
},
"title": {
"type": "text",
"index": true
},
"content": {
"type": "text",
"index": true
},
"comments": {
"type" : "text",
"index": true
}
}
}
}`
)
// Init will initialize the indexer
func (b *ElasticSearchIndexer) Init() (bool, error) {
ctx := context.Background()
exists, err := b.client.IndexExists(b.indexerName).Do(ctx)
if err != nil {
return false, err
}
if !exists {
var mapping = defaultMapping
createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx)
if err != nil {
return false, err
}
if !createIndex.Acknowledged {
return false, errors.New("init failed")
}
return false, nil
}
return true, nil
}
// Index will save the index data
func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error {
if len(issues) == 0 {
return nil
} else if len(issues) == 1 {
issue := issues[0]
_, err := b.client.Index().
Index(b.indexerName).
Id(fmt.Sprintf("%d", issue.ID)).
BodyJson(map[string]interface{}{
"id": issue.ID,
"repo_id": issue.RepoID,
"title": issue.Title,
"content": issue.Content,
"comments": issue.Comments,
}).
Do(context.Background())
return err
}
reqs := make([]elastic.BulkableRequest, 0)
for _, issue := range issues {
reqs = append(reqs,
elastic.NewBulkIndexRequest().
Index(b.indexerName).
Id(fmt.Sprintf("%d", issue.ID)).
Doc(map[string]interface{}{
"id": issue.ID,
"repo_id": issue.RepoID,
"title": issue.Title,
"content": issue.Content,
"comments": issue.Comments,
}),
)
}
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
Do(context.Background())
return err
}
// Delete deletes indexes by ids
func (b *ElasticSearchIndexer) Delete(ids ...int64) error {
if len(ids) == 0 {
return nil
} else if len(ids) == 1 {
_, err := b.client.Delete().
Index(b.indexerName).
Id(fmt.Sprintf("%d", ids[0])).
Do(context.Background())
return err
}
reqs := make([]elastic.BulkableRequest, 0)
for _, id := range ids {
reqs = append(reqs,
elastic.NewBulkDeleteRequest().
Index(b.indexerName).
Id(fmt.Sprintf("%d", id)),
)
}
_, err := b.client.Bulk().
Index(b.indexerName).
Add(reqs...).
Do(context.Background())
return err
}
// Search searches for issues by given conditions.
// Returns the matching issue IDs
func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
kwQuery := elastic.NewMultiMatchQuery(keyword, "title", "content", "comments")
query := elastic.NewBoolQuery()
query = query.Must(kwQuery)
if len(repoIDs) > 0 {
var repoStrs = make([]interface{}, 0, len(repoIDs))
for _, repoID := range repoIDs {
repoStrs = append(repoStrs, repoID)
}
repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
query = query.Must(repoQuery)
}
searchResult, err := b.client.Search().
Index(b.indexerName).
Query(query).
Sort("id", true).
From(start).Size(limit).
Do(context.Background())
if err != nil {
return nil, err
}
hits := make([]Match, 0, limit)
for _, hit := range searchResult.Hits.Hits {
id, _ := strconv.ParseInt(hit.Id, 10, 64)
hits = append(hits, Match{
ID: id,
})
}
return &SearchResult{
Total: searchResult.TotalHits(),
Hits: hits,
}, nil
}
// Close implements indexer
func (b *ElasticSearchIndexer) Close() {}

View file

@ -21,13 +21,13 @@ import (
// IndexerData data stored in the issue indexer
type IndexerData struct {
ID int64
RepoID int64
Title string
Content string
Comments []string
IsDelete bool
IDs []int64
ID int64 `json:"id"`
RepoID int64 `json:"repo_id"`
Title string `json:"title"`
Content string `json:"content"`
Comments []string `json:"comments"`
IsDelete bool `json:"is_delete"`
IDs []int64 `json:"ids"`
}
// Match represents on search result
@ -100,7 +100,7 @@ func InitIssueIndexer(syncReindex bool) {
// Create the Queue
switch setting.Indexer.IssueType {
case "bleve":
case "bleve", "elasticsearch":
handler := func(data ...queue.Data) {
indexer := holder.get()
if indexer == nil {
@ -160,6 +160,19 @@ func InitIssueIndexer(syncReindex bool) {
log.Info("PID: %d Issue Indexer closed", os.Getpid())
})
log.Debug("Created Bleve Indexer")
case "elasticsearch":
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, "gitea_issues")
if err != nil {
log.Fatal("Unable to initialize Elastic Search Issue Indexer: %v", err)
}
exist, err := issueIndexer.Init()
if err != nil {
log.Fatal("Unable to issueIndexer.Init: %v", err)
}
populate = !exist
holder.set(issueIndexer)
})
case "db":
issueIndexer := &DBIndexer{}
holder.set(issueIndexer)
@ -308,6 +321,7 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
}
// SearchIssuesByKeyword search issue ids by keywords and repo id
// WARNNING: You have to ensure user have permission to visit repoIDs' issues
func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
var issueIDs []int64
indexer := holder.get()
@ -316,7 +330,7 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) {
log.Error("SearchIssuesByKeyword(): unable to get indexer!")
return nil, fmt.Errorf("unable to get issue indexer")
}
res, err := indexer.Search(keyword, repoIDs, 1000, 0)
res, err := indexer.Search(keyword, repoIDs, 50, 0)
if err != nil {
return nil, err
}