I am attempting to verify whether an Elastic search document is being updated correctly by using an index request that has a different sequence number and primary term than what was assigned to the document. According to the documentation if IfSeqNo and IfPrimaryTerm values do not match with sequence_no and primary term of document then ES will throw VersionConflictEngineException . In my test code document is getting updated and sequence number is being incremented, even though I've set an old sequence number in my index request. I was expecting a VersionConflictEngineException exception to be thrown but this is not happening. Let me know what is wrong with my code. Thanks in advance.
Attaching my java code. test method is the entry point. I tried doing same thing from kibana client on same ES cluster but it is working as expected (throwing exception).
Elasticsearch version is 6.8 JavaClient version is elasticsearch-6.6.1
/**
* only perform this indexing request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public IndexRequest setIfSeqNo(long seqNo) { }
Kibana client example
POST /table_index/_doc/606635778254_db_dcs_int_test4_test_table_6SjooYiji3?if_seq_no=1574635&if_primary_term=6
{
"name" : "test_table_6SjooYiji3",
"description" : null,
"catalogId" : "606635778254",
"databaseName" : "db_dcs_int_test4"
}
Response from kibana
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3]: version conflict, required seqNo [1574635], primary term [6]. current document has seqNo [1574661] and primary term [6]",
"index_uuid": "jisjayWfTTe7Rd-Ll48qWw",
"shard": "0",
"index": "table_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3]: version conflict, required seqNo [1574635], primary term [6]. current document has seqNo [1574661] and primary term [6]",
"index_uuid": "jisjayWfTTe7Rd-Ll48qWw",
"shard": "0",
"index": "table_index"
},
"status": 409
}
private void test() {
String catalogId="606635778254";
String databaseName="db_dcs_int_test4";
String tableName="test_table_6SjooYiji3";
com.amazon.spektrdatacataloglambda.DBTable dbTable = new com.amazon.spektrdatacataloglambda.DBTable();
dbTable.setName(tableName);
dbTable.setCatalogId(catalogId);
dbTable.setMetadata(new HashMap<String, String>(){{ put("test","singhjfi");}});
elasticSearchUtils.upsertTableToTableIndexInES(catalogId, databaseName, dbTable, true, null);
}
public void upsertTableToTableIndexInES(final String catalogId, final String databaseName, final DBTable table,
final Boolean isOverwriteESTableDoc,
final GetResponse tableESDocument) {
IndexRequest indexRequest = new IndexRequest(ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE);
if (isOverwriteESTableDoc) {
// doing to prevent any data loss because of multiple update call from other DCS clients
// TODO: remove after testing
indexRequest = indexRequest.setIfSeqNo(1574633);
indexRequest = indexRequest.setIfPrimaryTerm(6);
log.info("setting sequence number and primary term");
}
final Map<String, Object> payload = getMapFromObject(table);
//Append table, database for search and actual payload information as string (no search on it).
payload.put(ES_TABLE_INDEX_DATABASE_NAME_KEY, databaseName);
payload.put(ES_DOC_KEY_CATALOG_ID, catalogId);
final String tableName = table.getName();
final String id = getESTableId(catalogId, databaseName, tableName);
indexRequest.source(payload).id(id);
log.info("Adding {} to elasticsearch index:{} as type {} with id {}", GSON.toJson(payload),
ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE, id);
performIndexRequest(indexRequest);
log.info("Added {} to elasticsearch index:{} as type {} with id {}", GSON.toJson(payload),
ES_TABLE_INDEX, ES_TABLE_INDEX_DOC_TYPE, id);
}
protected void performIndexRequest (IndexRequest indexRequest) {
try {
log.info("seq_num={}, primary_term={}", indexRequest.ifSeqNo(), indexRequest.ifPrimaryTerm());
log.info("Initiating Index Request {} from ElasticSearch", indexRequest);
IndexResponse indexResponse = elasticSearchClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("Index response {}", indexResponse);
} catch (IOException e) {
String ioError = String.format("I/O Error when performing index request %s on ElasticSearch.", indexRequest);
log.error(ioError, e);
throw new DataCatalogException(ioError + " " + e.getMessage(), e);
}
}
logs
[java] 11 Apr 2023 17:05:46,736 [INFO] [] (main) Utils:44 Initializing AppConfig with args:--root=build--realm=us-east-1--domain=beta
[java] 11 Apr 2023 17:05:46,742 [INFO] [] (main) AppConfigTree:1556 Using config root at build/brazil-config
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1703 Found domain = beta
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1704 Found realm = us-east-1
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1743 Determined appname = SpektrDataCatalogLambda
[java] 11 Apr 2023 17:05:46,743 [INFO] [] (main) AppConfigTree:1759 Determined appgroup = SpektrDataCatalogLambda
[java] 11 Apr 2023 17:05:46,744 [INFO] [] (main) AppConfigTree:1952 Parsing config file: build/brazil-config/app/SpektrDataCatalogLambda.cfg
[java] 11 Apr 2023 17:05:46,749 [INFO] [] (main) MixedCaseTableUtils:116 tempDirPrefix=/tmp/beta
[java] 11 Apr 2023 17:05:46,794 [INFO] [] (main) Utils:70 creating DCS client for awsAccountId=606635778254, realm=us-east-1, dcsEndpoint=https://datacatalogv2.beta.spektr.a2z.com
[java] 11 Apr 2023 17:05:50,746 [INFO] [] (main) Utils:57 Getting your credentials from Conduit.
[java] 11 Apr 2023 17:05:52,650 [INFO] [] (main) ElasticSearchClientFactory:46 Initialized ES URL as search-spektr-es-datacatalog-6-8-cq2riup3kroeai2hokcrfxh5me.us-east-1.es.amazonaws.com
[java] 11 Apr 2023 17:05:52,909 [INFO] [] (main) ElasticSearchUtils:121 setting sequence number and primary term
[java] 11 Apr 2023 17:05:53,009 [INFO] [] (main) ElasticSearchUtils:134 Adding {"name":"test_table_6SjooYiji3","catalogId":"606635778254","metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"databaseName":"db_dcs_int_test4"} to elasticsearch index:table_index as type gluetable with id 606635778254_db_dcs_int_test4_test_table_6SjooYiji3
[java] 11 Apr 2023 17:05:53,010 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1971 seq_num=1574633, primary_term=6
[java] 11 Apr 2023 17:05:53,011 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1972 Initiating Index Request index {[table_index][gluetable][606635778254_db_dcs_int_test4_test_table_6SjooYiji3], source[{"name":"test_table_6SjooYiji3","description":null,"catalogId":"606635778254","owner":null,"partitionKeys":null,"storageDescriptor":null,"storageDetails":null,"parameters":null,"lastRefreshedTime":null,"encryptedData":null,"versioned":null,"versionColumns":null,"metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"retention":null,"autoApproveReadRequest":null,"databaseName":"db_dcs_int_test4"}]} from ElasticSearch
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) ElasticSearchEnabledGlueDataCatalog:1974 Index response IndexResponse[index=table_index,type=gluetable,id=606635778254_db_dcs_int_test4_test_table_6SjooYiji3,version=35,result=updated,seqNo=1574661,primaryTerm=6,shards={"total":2,"successful":1,"failed":0}]
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) ElasticSearchUtils:139 Added {"name":"test_table_6SjooYiji3","catalogId":"606635778254","metadata":{"test":"singhjfi"},"retentionPeriodInDays":0,"databaseName":"db_dcs_int_test4"} to elasticsearch index:table_index as type gluetable with id 606635778254_db_dcs_int_test4_test_table_6SjooYiji3
[java] 11 Apr 2023 17:05:53,885 [INFO] [] (main) MixedCaseTableUtils:143 Done operation=test
The big difference I see is that in Kibana you're using the
_doctype whereas in your code the mapping type isgluetable, so you're not updating what you think you're updating.In Kibana you're indexing
whereas in your test code you're indexing
which are two different documents.
Make sure that
ES_TABLE_INDEX_DOC_TYPE = "_doc"in your code and you should get the same error.