본문으로 바로가기

Elasticsearch Golang Client 사용하기

category Go 2023. 8. 25. 09:52
반응형

사내에서 Elasticsearch 검색엔진의 검색API를 Golang으로 운영중입니다.

Pagination Search 비교를 하기 위해서 작성했던 코드를 기록하려고 합니다.


Go의 장단점

장점

- Go는 Java와 달리 기계 코드로 변환하는 과정이 없는 컴파일 언어입니다. (Java 보다 성능이 좋습니다.)

- 고루틴(Goroutine)이라는 비동기 작업 처리용 경량 스레드가 있습니다.

- 고는 C와 유사한 문법을 가지며, 간결한 문법으로 빠르게 코드를 작성할 수 있습니다.

 

단점:

- 상대적으로 새로운 언어로, Java처럼 다양한 라이브러리와 생태계가 아직 충분히 확립되지 않았습니다.

- 모든 제너릭 기능이 완전히 구현되지 않았기 때문에 코드의 재사용성을 확보하는데 한계가 있을 수 있습니다.

(Go 언어의 초기 버전은 제너릭을 지원하지 않아 다양한 타입에 대한 코드를 중복해서 작성해야 했지만 최근 버전에서는 일부 도입되었습니다.)

Java는 다양한 플랫폼에서 안정적으로 동작하며, 대규모 애플리케이션과 시스템을 개발하기에 적합합니다. 

반면에 Go는 빠른 개발 속도와 효율적인 병렬 처리를 필요로 하는 경우에 적합합니다.

 

Go - Elasticsearch Client로 아래 두 개 라이브러리를 사용해보려고 합니다.

1. github.com/olivere/elastic/v7

 

- Search-scroll + slice

위 쿼리를 사용하기 위해 작성한 코드 입니다.

해당 라이브러리는 아래 소스처럼 문서를 색인하고 쿼리하는 기능을 제공하여 데이터 검색 및 조작이 용이합니다.

단점으로는 현재 더이상 활발하게 개발하지 않아 라이브러리의 최신 버전 및 Elasticsearch의 호환성이 업데이트 되지 않고 모든 Elasticsearch의 기능을 지원하지 않을 수 있습니다.

 

Search-scroll ID 값을 받아 Goroutine으로 worker 만큼 비동기 처리를 진행하였습니다.

 

package main

import (
	"context"
	"fmt"
	"io"
	"search/performance/setting"
	"search/performance/util"
	"sync"
	"time"

	esclient "github.com/olivere/elastic/v7"
)

var (
	wg            sync.WaitGroup
	searchResults []*esclient.SearchHit
	startTime     time.Time
)

func main() {
	setting.LoadConfig("../config/config.yml")
	setting.InitEsClient()
	elasticsearchClient := setting.EsClient

	startTime = time.Now()
	fmt.Println(startTime)

	index := setting.GetTargetIndex()
	size := 10000
	sum := 0

	worker := setting.Worker()
	wg.Add(worker)

	for i := 0; i < worker; i++ {
		sliceQuery := esclient.NewSliceQuery().Id(i).Max(worker)
		go fetchData(index, size, sliceQuery, &sum, elasticsearchClient)
	}

	wg.Wait()

	util.WriteSearchEs(searchResults)

	elapsedTime := time.Since(startTime)
	fmt.Println("Execution time: ", elapsedTime.Seconds(), sum, time.Now())
}

func fetchData(index string, size int, sliceQuery *esclient.SliceQuery, sum *int, client *esclient.Client) {
	defer wg.Done()

	sliceAllQuery := client.Scroll(index).
		Slice(sliceQuery).
		Size(size)

	for {
		searchResult, err := sliceAllQuery.Do(context.Background())
		if err == io.EOF {
			break
		}
		if err != nil {
			fmt.Println(err.Error())
			break
		}

		*sum += len(searchResult.Hits.Hits)
		printProgress(*sum)
		//searchResults = append(searchResults, searchResult.Hits.Hits...)
	}
}

func printProgress(sum int) {
	if sum%100000 == 0 {
		elapsedTime := time.Since(startTime)
		fmt.Println("sum = ", elapsedTime.Seconds(), sum, time.Now())
	}
}

 

2. github.com/elastic/go-elasticsearch/v7, v8

 

- Search-after + slice + pit

PIT는 7.10.1버전 이상부터 사용가능했고 1번에서 사용한 라이브러리에서는 Search-after와 slice를 동시에 사용하지 못하여 2번쨰 라이브러리로 사용하였습니다.

 

해당 라이브러리는 Elasticsearch 공식 지원 라이브러리이기 때문에 호환성과 업데이트가 보장됩니다. 또한 최신 기능을 빠르게 적용하는 장점이 있습니다.

 

단점으로는 아래 코드처럼 Query Dsl로 쿼리를 작성해야 합니다.

package main

import (
	"encoding/json"
	"fmt"
	"search/performance/setting"
	"search/performance/util"
	"strings"
	"sync"
	"time"

	"github.com/elastic/go-elasticsearch/v8"
)

var (
	wg            sync.WaitGroup
	searchResults []interface{}
	startTime     time.Time
	r             map[string]interface{}
)

func main() {
	setting.LoadConfig("../config/config.yml")
	setting.InitEsClientV2()
	elasticsearchClient := setting.EsClientV2

	startTime = time.Now()
	fmt.Println(startTime)

	index := setting.GetTargetIndex()
	size := 10000
	sum := 0

	pitID, err := getPointInTimeID(elasticsearchClient, index)
	if err != nil {
		fmt.Println("Error getting Point In Time ID: ", err)
		return
	}

	worker := setting.Worker()
	wg.Add(worker)

	for i := 0; i < worker; i++ {
		go fetchSearchResults(i, worker, size, &sum, pitID, elasticsearchClient)
	}

	wg.Wait()

	util.WriteSearchEsV2(searchResults)

	elapsedTime := time.Since(startTime)
	fmt.Println("Execution time: ", elapsedTime.Seconds(), sum, time.Now())
}

func getPointInTimeID(client *elasticsearch.Client, index string) (string, error) {
	res, err := client.OpenPointInTime([]string{index}, "1m")
	if err != nil {
		return "", err
	}
	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
		return "", err
	}

	return r["id"].(string), nil
}

const baseQuery = `
	"slice" : {"id" : %d, "max" : %d},
	"pit" : {"id" : "%s", "keep_alive" : "1m"},
	"sort" : ["_doc"]
`

func fetchSearchResults(i int, worker int, size int, sum *int, pitID string, client *elasticsearch.Client) {
	defer wg.Done()

	var resultLen int
	var after []interface{}

	for {
		var b strings.Builder
		b.WriteString("{\n")
		b.WriteString(fmt.Sprintf(baseQuery, i, worker, pitID))
		if len(after) > 0 && after[0] != "" && after[0] != "null" {
			b.WriteString(", \n")
			b.WriteString(fmt.Sprintf(`	"search_after": [%f]`, after[0]))
			after = []interface{}{}
		}
		b.WriteString("\n}")

		res, err := client.Search(
			client.Search.WithSize(10000),
			client.Search.WithBody(strings.NewReader(b.String())),
		)
		if err != nil {
			fmt.Println("Error Query: ", err)
		}
		if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
			fmt.Println("Error BaseQuery Parsing: ", err)
		}

		resultLen = len(r["hits"].(map[string]interface{})["hits"].([]interface{}))

		for _, sort := range r["hits"].(map[string]interface{})["hits"].([]interface{})[resultLen-1].(map[string]interface{})["sort"].([]interface{}) {
			after = append(after, sort)
		}

		*sum += resultLen
		printProgress(*sum)
		//searchResults = append(searchResults, r["hits"].(map[string]interface{})["hits"].([]interface{})...)

		if resultLen < size {
			return
		}
	}
}

func printProgress(sum int) {
	if sum%100000 == 0 {
		elapsedTime := time.Since(startTime)
		fmt.Println("sum = ", elapsedTime.Seconds(), sum, time.Now())
	}
}

 

 

간단한 기능을 사용할 경우에는 olivere/elastic 라이브러리를 사용하면 간편하겠지만 복잡하거나 최신 기능을 사용할경우에는 Elastic사의 공식 라이브러리인 go-elasticsearch 사용이 적합할꺼 같습니다.

반응형

'Go' 카테고리의 다른 글

Golang RabbitMQ 클라이언트 사용해보기  (0) 2022.08.19
Go Gin 사용해보기  (0) 2022.08.16
Golang 설치  (0) 2022.08.16