Elasticsearch bulk no longer indexing docs, but no errors

940 Views Asked by At

6.0

Hi running ES 1.6.0

I had a bulk indexing job that used to work fine. I wanted to bulk index a few more documents in a new index. I sent about 100K+ documents but only a few thousand got indexed.

I check the logs and there was no particular errors logged on the ES nodes and my JAVA client is not returning any errors.

I also looked at Marvel Bulk Thread Pool there was no rejections. The queue size was 0.

I have 20TB of disk space of which 13TB is used by other indexes. So I am below the disk watermark anything else?

Here is the bulking code...

        BulkRequestBuilder bulkRequest = client.prepareBulk();
        final Context ctx = getVertx().currentContext();

        for(int i = 0; i < documents.size(); i++)
        {   
            final JsonObject obj = documents.get(i);
            final JsonObject indexable = new JsonObject()

            .putString("action", "index")
            .putString("_index", obj.getString("index"))
            .putString("_type", obj.getString("type"))
            .putString("_id", obj.getString("id"))
            .putString("_route", obj.getString("routing"))
            .putObject("_source", obj);                                     

            final String index = getRequiredIndex(indexable, message);
            if (index == null) {
                return;
            }

            // type is optional
            String type = indexable.getString(CONST_TYPE);
            ;

            JsonObject source = indexable.getObject(CONST_SOURCE);
            if (source == null) {
                sendError(message, CONST_SOURCE + " is required");
                return;
            }

            // id is optional
            String id = indexable.getString(CONST_ID);
            String route = indexable.getString(CONST_ROUTE);

            IndexRequestBuilder builder = client.prepareIndex(index, type, id).setSource(source.encode());

            if(!route.isEmpty())
                builder.setRouting(route);              

            bulkRequest.add(builder);
        }

        bulkRequest.execute(new ActionListener<BulkResponse>(){
            @Override
            public void onResponse(BulkResponse resp) {


                message.reply(new JsonObject().putString("status", "Took: " + resp.getTookInMillis() + ", Indexed:" + documents.size() + "," + resp.getItems().length + ", Failed: " + resp.hasFailures()));
            }

            @Override
            public void onFailure(Throwable t) {
                ctx.runOnContext(new Handler<Void>() {
                    @Override
                    public void handle(Void event) {
                        sendError(message,
                                "Index error: " + t.getMessage(),
                                new RuntimeException(t));
                    }
                });
            }
        });
0

There are 0 best solutions below