diff --git a/pyproject.toml b/pyproject.toml index 59df17f..0ee58c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,11 @@ oauth2-api-client = [ "oauthlib>=3.2.2", "requests_oauthlib>=1.3.1" ] +opensearch-client = [ + "boto3>=1.26.5", + "botocore>=1.29.5", + "opensearch-py>=2.0.0" +] postgresql-client = [ "psycopg[binary]>=3.1.6" ] @@ -81,7 +86,7 @@ research-catalog-identifier-helper = [ "requests>=2.28.1" ] development = [ - "nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper,log_helper]", + "nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,opensearch-client,postgresql-client,redshift-client,s3-client,secrets-manager-client,sftp-client,config-helper,obfuscation-helper,patron-data-helper,research-catalog-identifier-helper,log_helper]", "flake8>=6.0.0", "freezegun>=1.2.2", "mock>=4.0.3", diff --git a/src/nypl_py_utils/classes/opensearch_client.py b/src/nypl_py_utils/classes/opensearch_client.py new file mode 100644 index 0000000..702a138 --- /dev/null +++ b/src/nypl_py_utils/classes/opensearch_client.py @@ -0,0 +1,153 @@ +import boto3 +import os + +from botocore.exceptions import ClientError +from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth +from nypl_py_utils.functions.log_helper import create_log + + +class OpenSearchClient: + """ + Client for interacting with an AWS OpenSearch Service domain. + + Takes as input the OpenSearch domain endpoint (without the https:// scheme) + and an optional AWS region. Authentication is performed via AWS IAM using + SigV4 request signing. + """ + + def __init__(self, host, region=None): + self.logger = create_log('opensearch_client') + self.host = host + self.region = region or os.environ.get('AWS_REGION', 'us-east-1') + + try: + credentials = boto3.Session().get_credentials() + auth = AWSV4SignerAuth(credentials, self.region, 'es') + self.client = OpenSearch( + hosts=[{'host': self.host, 'port': 443}], + http_auth=auth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + pool_maxsize=10 + ) + except ClientError as e: + self.logger.error( + 'Could not create OpenSearch client: {err}'.format(err=e)) + raise OpenSearchClientError( + 'Could not create OpenSearch client: {err}'.format(err=e) + ) from None + + def create_index(self, index, body=None): + """ + Creates an OpenSearch index with optional mappings and settings. + + Parameters + ---------- + index: str + The name of the index to create + body: dict, optional + The index settings and/or mappings + """ + self.logger.info('Creating OpenSearch index {}'.format(index)) + try: + return self.client.indices.create(index=index, body=body) + except Exception as e: + self.logger.error( + 'Error creating OpenSearch index {name}: {error}'.format( + name=index, error=e)) + raise OpenSearchClientError( + 'Error creating OpenSearch index {name}: {error}'.format( + name=index, error=e)) from None + + def index_document(self, index, document, document_id=None): + """ + Indexes a document in the given OpenSearch index. + + Parameters + ---------- + index: str + The name of the index + document: dict + The document to index + document_id: str, optional + The ID to assign to the document. If not provided, OpenSearch + will auto-generate one. + """ + self.logger.info( + 'Indexing document in OpenSearch index {}'.format(index)) + try: + return self.client.index( + index=index, body=document, id=document_id) + except Exception as e: + self.logger.error( + 'Error indexing document in OpenSearch index {name}: ' + '{error}'.format(name=index, error=e)) + raise OpenSearchClientError( + 'Error indexing document in OpenSearch index {name}: ' + '{error}'.format(name=index, error=e)) from None + + def delete_document(self, index, document_id): + """ + Deletes a document from the given OpenSearch index by ID. + + Parameters + ---------- + index: str + The name of the index + document_id: str + The ID of the document to delete + """ + self.logger.info( + 'Deleting document {id} from OpenSearch index {index}'.format( + id=document_id, index=index)) + try: + return self.client.delete(index=index, id=document_id) + except Exception as e: + self.logger.error( + 'Error deleting document {id} from OpenSearch index ' + '{name}: {error}'.format( + id=document_id, name=index, error=e)) + raise OpenSearchClientError( + 'Error deleting document {id} from OpenSearch index ' + '{name}: {error}'.format( + id=document_id, name=index, error=e)) from None + + def search(self, index, query): + """ + Executes a search query against the given OpenSearch index. + + Parameters + ---------- + index: str + The name of the index to search + query: dict + The OpenSearch query body + + Returns + ------- + dict + The OpenSearch response containing hits and metadata + """ + self.logger.info('Searching OpenSearch index {}'.format(index)) + self.logger.debug('Executing query {}'.format(query)) + try: + return self.client.search(index=index, body=query) + except Exception as e: + self.logger.error( + 'Error searching OpenSearch index {name}: {error}'.format( + name=index, error=e)) + raise OpenSearchClientError( + 'Error searching OpenSearch index {name}: {error}'.format( + name=index, error=e)) from None + + def close_connection(self): + """Closes the OpenSearch connection""" + self.logger.debug( + 'Closing OpenSearch connection to {}'.format(self.host)) + self.client.close() + + +class OpenSearchClientError(Exception): + def __init__(self, message=None): + self.message = message diff --git a/tests/test_opensearch_client.py b/tests/test_opensearch_client.py new file mode 100644 index 0000000..50a4f28 --- /dev/null +++ b/tests/test_opensearch_client.py @@ -0,0 +1,84 @@ +import pytest + +from nypl_py_utils.classes.opensearch_client import ( + OpenSearchClient, OpenSearchClientError) + +_TEST_HOST = 'test-domain.us-east-1.es.amazonaws.com' +_TEST_INDEX = 'test-index' +_TEST_DOC_ID = 'test-doc-id' +_TEST_DOCUMENT = {'title': 'Test Document', 'body': 'Test body content'} +_TEST_QUERY = {'query': {'match': {'title': 'test'}}} +_TEST_RESPONSE = { + 'hits': { + 'total': {'value': 1, 'relation': 'eq'}, + 'hits': [{'_index': _TEST_INDEX, '_id': _TEST_DOC_ID, + '_source': _TEST_DOCUMENT}] + } +} + + +class TestOpenSearchClient: + + @pytest.fixture + def test_instance(self, mocker): + mocker.patch('boto3.Session') + mocker.patch('nypl_py_utils.classes.opensearch_client.AWSV4SignerAuth') + mocker.patch('nypl_py_utils.classes.opensearch_client.OpenSearch') + return OpenSearchClient(_TEST_HOST) + + def test_create_index(self, test_instance): + test_instance.create_index(_TEST_INDEX) + test_instance.client.indices.create.assert_called_once_with( + index=_TEST_INDEX, body=None) + + def test_create_index_with_body(self, test_instance): + body = {'mappings': {'properties': {'title': {'type': 'text'}}}} + test_instance.create_index(_TEST_INDEX, body=body) + test_instance.client.indices.create.assert_called_once_with( + index=_TEST_INDEX, body=body) + + def test_create_index_error(self, test_instance): + test_instance.client.indices.create.side_effect = Exception('error') + with pytest.raises(OpenSearchClientError): + test_instance.create_index(_TEST_INDEX) + + def test_index_document(self, test_instance): + test_instance.index_document(_TEST_INDEX, _TEST_DOCUMENT, _TEST_DOC_ID) + test_instance.client.index.assert_called_once_with( + index=_TEST_INDEX, body=_TEST_DOCUMENT, id=_TEST_DOC_ID) + + def test_index_document_without_id(self, test_instance): + test_instance.index_document(_TEST_INDEX, _TEST_DOCUMENT) + test_instance.client.index.assert_called_once_with( + index=_TEST_INDEX, body=_TEST_DOCUMENT, id=None) + + def test_index_document_error(self, test_instance): + test_instance.client.index.side_effect = Exception('error') + with pytest.raises(OpenSearchClientError): + test_instance.index_document(_TEST_INDEX, _TEST_DOCUMENT) + + def test_delete_document(self, test_instance): + test_instance.delete_document(_TEST_INDEX, _TEST_DOC_ID) + test_instance.client.delete.assert_called_once_with( + index=_TEST_INDEX, id=_TEST_DOC_ID) + + def test_delete_document_error(self, test_instance): + test_instance.client.delete.side_effect = Exception('error') + with pytest.raises(OpenSearchClientError): + test_instance.delete_document(_TEST_INDEX, _TEST_DOC_ID) + + def test_search(self, test_instance): + test_instance.client.search.return_value = _TEST_RESPONSE + result = test_instance.search(_TEST_INDEX, _TEST_QUERY) + test_instance.client.search.assert_called_once_with( + index=_TEST_INDEX, body=_TEST_QUERY) + assert result == _TEST_RESPONSE + + def test_search_error(self, test_instance): + test_instance.client.search.side_effect = Exception('error') + with pytest.raises(OpenSearchClientError): + test_instance.search(_TEST_INDEX, _TEST_QUERY) + + def test_close_connection(self, test_instance): + test_instance.close_connection() + test_instance.client.close.assert_called_once()