import "github.com/greenbone/opensight-golang-libraries/pkg/openSearch/openSearchClient"Package openSearchClient provides functionality for interacting with OpenSearch.
Package openSearchClient provides a client for OpenSearch designed to allow easy mocking in tests.
Example Usage:
clientConfig, err := config.ReadOpensearchClientConfig()
if err != nil {
return err
}
opensearchProjectClient, err := NewOpenSearchProjectClient(context.Background(), clientConfig)
if err != nil {
return err
}
client := NewClient(opensearchProjectClient, 10, 1)
query := `{"query":{"bool":{"filter":[{"term":{"oid":{"value":"1.3.6.1.4.1.25623.1.0.117842"}}}]}}}`
responseBody, err := client.Search(indexName, []byte(query))
if err != nil {
return err
}
searchResponse, err := UnmarshalSearchResponse[*Vulnerability](responseBody)
if err != nil {
return err
}
For further usage examples see ./client_test.go.
- func InitializeJson(timeFormats []string)
- func InjectAuthenticationIntoClient(client *opensearchapi.Client, config config.OpensearchClientConfig, tokenReceiver TokenReceiver) error
- func NewOpenSearchProjectClient(ctx context.Context, config config.OpensearchClientConfig, tokenReceiver TokenReceiver) (*opensearchapi.Client, error)
- func SerializeDocumentsForBulkUpdate[T any](indexName string, documents []T) ([]byte, error)
- func StartOpensearchTestContainer(ctx context.Context) (testcontainers.Container, config.OpensearchClientConfig, error)
- func Unmarshal(data []byte, v any) error
- func UnmarshalWithoutValidation(data []byte, v any) error
- type Authenticator
- type Bucket
- type BulkResponse
- type ByCreationDate
- type Client
- func NewClient(openSearchProjectClient *opensearchapi.Client, updateMaxRetries int, updateRetryDelay time.Duration) *Client
- func (c *Client) AsyncDeleteByQuery(indexName string, requestBody []byte) error
- func (c *Client) BulkUpdate(indexName string, requestBody []byte) error
- func (c *Client) Close()
- func (c *Client) CompositeAggStream(indexName string, requestBody []byte, ctx context.Context) (io.Reader, error)
- func (c *Client) Count(indexName string, requestBody []byte) (count int64, err error)
- func (c *Client) DeleteByQuery(indexName string, requestBody []byte) error
- func (c *Client) Search(indexName string, requestBody []byte) (responseBody []byte, err error)
- func (c *Client) SearchStream(indexName string, requestBody []byte, scrollTimeout time.Duration, ctx context.Context) (io.Reader, error)
- func (c *Client) SyncUpdate(indexName string, requestBody []byte) (responseBody []byte, err error)
- func (c *Client) Update(indexName string, requestBody []byte) (responseBody []byte, err error)
- type CountReq
- type CountResp
- type CreatedResponse
- type DocumentError
- type DocumentErrorType
- type DynamicAggregation
- type DynamicAggregationHits
- type IndexError
- type IndexFunction
- func NewIndexFunction(openSearchProjectClient *opensearchapi.Client) *IndexFunction
- func (i *IndexFunction) AliasExists(aliasName string) (bool, error)
- func (i *IndexFunction) CreateIndex(indexName string, indexSchema []byte) error
- func (i *IndexFunction) CreateOrPutAlias(aliasName string, indexNames ...string) error
- func (i *IndexFunction) DeleteAliasFromIndex(indexName string, aliasName string) error
- func (i *IndexFunction) DeleteIndex(indexName string) error
- func (i *IndexFunction) ForceMerge(index string, maximumNumberOfSegments int) error
- func (i *IndexFunction) GetIndexSettings(index string) (map[string]interface{}, error)
- func (i *IndexFunction) GetIndexes(pattern string) ([]string, error)
- func (i *IndexFunction) GetIndexesForAlias(aliasName string) ([]string, error)
- func (i *IndexFunction) IndexExists(indexName string) (bool, error)
- func (i *IndexFunction) IndexHasAlias(indexNames []string, aliasNames []string) (bool, error)
- func (i *IndexFunction) RefreshIndex(index string) error
- func (i *IndexFunction) RemoveIndexesFromAlias(indexesToRemove []string, aliasName string) error
- func (i *IndexFunction) SetIndexSettings(index string, settingsBody io.Reader) error
- type IndexInfo
- type KeepJsonAsString
- type OpenSearchError
- type OpenSearchErrorResponse
- type OpenSearchErrors
- type OpenSearchHealth
- type OpenSearchResourceAlreadyExists
- type OpenSearchResourceNotFound
- type OpenSearchRootCause
- type OpensearchTestContainer
- type Request
- type Response
- type SearchResponse
- type SearchResponseAggregation
- type SearchResponseAggregations
- type SearchResponseHit
- type SearchResponseHits
- type SearchResponseHitsTotal
- type ShardStats
- type SyncUpdateClient
- type TokenReceiver
- type UpdateQueue
func InitializeJson(timeFormats []string)func InjectAuthenticationIntoClient(client *opensearchapi.Client, config config.OpensearchClientConfig, tokenReceiver TokenReceiver) errorInjectAuthenticationIntoClient is a function that sets up the authentication method for the OpenSearch client. client is the OpenSearch client to inject the authentication into. config is the configuration for the OpenSearch client. tokenReceiver is the token receiver for OpenID authentication and must implement the GetClientAccessToken function. It can be nil for basic authentication.
func NewOpenSearchProjectClient(ctx context.Context, config config.OpensearchClientConfig, tokenReceiver TokenReceiver) (*opensearchapi.Client, error)NewOpenSearchProjectClient creates a new official OpenSearch client (package github.com/opensearch-project/opensearch-go) for usage NewClient. It returns an error if the client couldn't be created or the connection couldn't be established.
ctx is the context to use for the connection. config is the configuration for the client.
func SerializeDocumentsForBulkUpdate[T any](indexName string, documents []T) ([]byte, error)SerializeDocumentsForBulkUpdate serializes documents for bulk update. Can be used in conjunction with BulkUpdate. It returns the serialized documents or an error in case something went wrong.
indexName is the name of the index to update. documents are the documents to update.
func StartOpensearchTestContainer(ctx context.Context) (testcontainers.Container, config.OpensearchClientConfig, error)StartOpensearchTestContainer starts a test container with opensearch and returns the container and the config for the opensearch client. It returns an error if the container couldn't be created or started.
ctx is the context to use for the container.
func Unmarshal(data []byte, v any) errorUnmarshal unmarshalls data into v. It returns an error if the data is invalid.
func UnmarshalWithoutValidation(data []byte, v any) errorUnmarshalWithoutValidation unmarshalls data into v. It returns an error if the data can not be parsed.
Authenticator is a struct that holds the necessary information for authenticating with OpenSearch.
type Authenticator struct {
// contains filtered or unexported fields
}func (a *Authenticator) Perform(req *http.Request) (*http.Response, error)Perform is a method that implements the opensearchtransport.Interface interface. It injects the authentication header into the request and then performs the request.
type Bucket struct {
Key any `json:"key"`
KeyAsString string `json:"key_as_string"`
DocCount uint `json:"doc_count"`
Aggs map[string]DynamicAggregation
}func (bucket *Bucket) UnmarshalJSON(bytes []byte) errorBulkResponse bulk response
type BulkResponse struct {
Took uint `json:"took"`
HasError bool `json:"errors"`
Errors []IndexError `json:"items"`
}type ByCreationDate []IndexInfofunc (a ByCreationDate) Len() intfunc (a ByCreationDate) Less(i, j int) boolfunc (a ByCreationDate) Swap(i, j int)Client is a client for OpenSearch designed to allow easy mocking in tests. It is a wrapper around the official OpenSearch client github.com/opensearch-project/opensearch-go .
type Client struct {
// contains filtered or unexported fields
}func NewClient(openSearchProjectClient *opensearchapi.Client, updateMaxRetries int, updateRetryDelay time.Duration) *ClientNewClient creates a new OpenSearch client.
openSearchProjectClient is the official OpenSearch client to wrap. Use NewOpenSearchProjectClient to create it. updateMaxRetries is the number of retries for update requests. updateRetryDelay is the delay between retries.
func (c *Client) AsyncDeleteByQuery(indexName string, requestBody []byte) errorAsyncDeleteByQuery updates documents in the given index asynchronously. It does not wait for the update to finish before returning. It returns an error in case something went wrong.
indexName is the name of the index to delete from. requestBody is the request body to send to OpenSearch to identify the documents to be deleted.
func (c *Client) BulkUpdate(indexName string, requestBody []byte) errorBulkUpdate performs a bulk update in the given index. It returns an error in case something went wrong.
indexName is the name of the index to update. requestBody is the request body to send to OpenSearch specifying the bulk update.
func (c *Client) Close()Close stops the underlying UpdateQueue allowing a graceful shutdown.
func (c *Client) CompositeAggStream(indexName string, requestBody []byte, ctx context.Context) (io.Reader, error)func (c *Client) Count(indexName string, requestBody []byte) (count int64, err error)func (c *Client) DeleteByQuery(indexName string, requestBody []byte) errorDeleteByQuery updates documents in the given index. It waits for the update to finish before returning. It returns an error in case something went wrong.
indexName is the name of the index to delete from. requestBody is the request body to send to OpenSearch to identify the documents to be deleted.
func (c *Client) Search(indexName string, requestBody []byte) (responseBody []byte, err error)Search searches for documents in the given index.
indexName is the name of the index to search in. requestBody is the request body to send to OpenSearch. It returns the response body as or an error in case something went wrong.
func (c *Client) SearchStream(indexName string, requestBody []byte, scrollTimeout time.Duration, ctx context.Context) (io.Reader, error)func (c *Client) SyncUpdate(indexName string, requestBody []byte) (responseBody []byte, err error)SyncUpdate updates documents in the given index synchronously.
func (c *Client) Update(indexName string, requestBody []byte) (responseBody []byte, err error)Update updates documents in the given index using UpdateQueue (which is also part of this package). It does not wait for the update to finish before returning. It returns the response body as or an error in case something went wrong.
indexName is the name of the index to update. requestBody is the request body to send to OpenSearch specifying the update.
type CountReq struct {
Indices []string
Body io.Reader
Header http.Header
Params map[string]string
}func (r CountReq) GetRequest() (*http.Request, error)GetRequest returns the *http.Request that gets executed by the client
type CountResp struct {
Count int64 `json:"count"`
Shards ShardStats `json:"_shards"`
}func (r CountResp) Inspect() opensearchapi.Inspecttype CreatedResponse struct {
Id string `json:"_id"`
Result string `json:"result"`
}type DocumentError struct {
IndexName string `json:"_index"`
IndexType string `json:"_type"`
DocumentId string `json:"_id"`
StatusCode uint `json:"status"`
Error DocumentErrorType `json:"error"`
}type DocumentErrorType struct {
Type string `json:"type"`
Reason string `json:"reason"`
}type DynamicAggregation struct {
DocCountErrorUpperBound int `json:"doc_count_error_upper_bound"`
SumOtherDocCount uint `json:"sum_other_doc_count"`
Buckets []Bucket `json:"buckets"`
Value any `json:"value"`
ValueAsString any `json:"value_as_string"`
Hits DynamicAggregationHits `json:"hits"`
}type DynamicAggregationHits struct {
Total SearchResponseHitsTotal `json:"total"`
SearchHits KeepJsonAsString `json:"hits"`
}type IndexError struct {
Index DocumentError `json:"index"`
}type IndexFunction struct {
// contains filtered or unexported fields
}func NewIndexFunction(openSearchProjectClient *opensearchapi.Client) *IndexFunctionfunc (i *IndexFunction) AliasExists(aliasName string) (bool, error)func (i *IndexFunction) CreateIndex(indexName string, indexSchema []byte) errorCreateIndex creates an index
func (i *IndexFunction) CreateOrPutAlias(aliasName string, indexNames ...string) errorfunc (i *IndexFunction) DeleteAliasFromIndex(indexName string, aliasName string) errorfunc (i *IndexFunction) DeleteIndex(indexName string) errorfunc (i *IndexFunction) ForceMerge(index string, maximumNumberOfSegments int) errorfunc (i *IndexFunction) GetIndexSettings(index string) (map[string]interface{}, error)func (i *IndexFunction) GetIndexes(pattern string) ([]string, error)func (i *IndexFunction) GetIndexesForAlias(aliasName string) ([]string, error)previously AliasPointsToIndex
func (i *IndexFunction) IndexExists(indexName string) (bool, error)func (i *IndexFunction) IndexHasAlias(indexNames []string, aliasNames []string) (bool, error)func (i *IndexFunction) RefreshIndex(index string) errorfunc (i *IndexFunction) RemoveIndexesFromAlias(indexesToRemove []string, aliasName string) errorfunc (i *IndexFunction) SetIndexSettings(index string, settingsBody io.Reader) errortype IndexInfo struct {
Name string
CreationDate int // Store Unix timestamp
}func ConvertToIndexInfo(indices []opensearchapi.CatIndexResp) []IndexInfofunc SortIndexInfoByCreationDate(indexes []IndexInfo) []IndexInfotype KeepJsonAsString []bytefunc (k *KeepJsonAsString) UnmarshalJSON(data []byte) errorOpenSearchError openSearch error
type OpenSearchError struct {
Message string
}func NewOpenSearchError(message string) *OpenSearchErrorfunc (o *OpenSearchError) Error() stringtype OpenSearchErrorResponse struct {
Error OpenSearchErrors
Status int
}type OpenSearchErrors struct {
Reasons []OpenSearchRootCause
Type string
Reason string
}type OpenSearchHealth struct {
// contains filtered or unexported fields
}func NewOpenSearchHealth(openSearchProjectClient *opensearchapi.Client) *OpenSearchHealthfunc (h *OpenSearchHealth) GetDiskAllocationPercentage() (int, error)func (h *OpenSearchHealth) GetIndexesWithCreationDate(pattern string) ([]IndexInfo, error)OpenSearchResourceAlreadyExists openSearch resource already exists
type OpenSearchResourceAlreadyExists struct {
Message string
}func NewOpenSearchResourceAlreadyExists(message string) *OpenSearchResourceAlreadyExistsfunc (o *OpenSearchResourceAlreadyExists) Error() stringOpenSearchResourceNotFound openSearch resource already exists
type OpenSearchResourceNotFound struct {
Message string
}func NewOpenSearchResourceNotFound(message string) *OpenSearchResourceNotFoundfunc (o *OpenSearchResourceNotFound) Error() stringtype OpenSearchRootCause struct {
Type string
Reason string
}OpensearchTestContainer represents the opensearch container
type OpensearchTestContainer struct {
testcontainers.Container
}type Request struct {
IndexName string
RequestBody []byte
Response chan Response // Use the new Response type
}type Response struct {
Body []byte
Err error
}type SearchResponse[T any] struct {
Took uint `json:"took"`
TimedOut bool `json:"timed_out"`
Hits SearchResponseHits[T] `json:"hits"`
Aggregations SearchResponseAggregations `json:"aggregations"`
}func UnmarshalSearchResponse[T any](data []byte) (*SearchResponse[T], error)func (s SearchResponse[T]) GetResults() []TGetResults returns list of documents
func (s SearchResponse[T]) GetSearchHits() []SearchResponseHit[T]type SearchResponseAggregation struct {
DocCountErrorUpperBound int `json:"doc_count_error_upper_bound"`
SumOtherDocCount uint `json:"sum_other_doc_count"`
Buckets []Bucket `json:"buckets"`
Value uint64 `json:"value"`
}type SearchResponseAggregations map[string]SearchResponseAggregationtype SearchResponseHit[T any] struct {
Id string `json:"_id"`
Type string `json:"_type"`
Content T `json:"_source"`
}type SearchResponseHits[T any] struct {
Total SearchResponseHitsTotal
SearchHits []SearchResponseHit[T] `json:"hits"`
}type SearchResponseHitsTotal struct {
Value uint
Relation string
}type ShardStats struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
}type SyncUpdateClient struct {
// contains filtered or unexported fields
}func NewSyncUpdateClient(osClient *opensearchapi.Client, maxRetries int, retryDelay time.Duration) *SyncUpdateClientfunc (s *SyncUpdateClient) Update(indexName string, requestBody []byte) ([]byte, error)TokenReceiver is an interface for receiving client access tokens.
type TokenReceiver interface {
GetClientAccessToken(clientName, clientSecret string) (string, error)
ClearClientAccessToken()
}UpdateQueue is a queue for OpenSearch update requests.
type UpdateQueue struct {
// contains filtered or unexported fields
}func NewRequestQueue(openSearchClient *opensearchapi.Client, updateMaxRetries int, updateRetryDelay time.Duration) *UpdateQueueNewRequestQueue creates a new update queue.
openSearchClient is the official OpenSearch client. Use NewOpenSearchProjectClient to create it. updateMaxRetries is the number of retries for update requests. updateRetryDelay is the delay between retries.
func (q *UpdateQueue) Stop()func (q *UpdateQueue) Update(indexName string, requestBody []byte) ([]byte, error)Update queues and update for an index and returns the response body or an error
Is called from pkg/openSearch/open_search_client/client.go: func (c *Client) Update(indexName string, requestBody []byte) (responseBody []byte, err error) and tested in pkg/openSearch/open_search_client/client_test.go
indexName: The name of the index to update requestBody: The request body to send to the index
Returns: The response body or an error
Generated by gomarkdoc
Copyright (C) 2022-2023 [Greenbone AG][Greenbone AG]
Licensed under the GNU General Public License v3.0 or later.