Elasticsearch node js point in time search_phase_execution_exception

2k Views Asked by At
    const body = {
  query: {
    geo_shape: {
      geometry: {
        relation: 'within',
        shape: {
          type: 'polygon',
          coordinates: [$polygon],
        },
      },
    },
  },
  pit: {
    id:  "t_yxAwEPZXNyaS1wYzYtMjAxN3IxFjZxU2RBTzNyUXhTUV9XbzhHSk9IZ3cAFjhlclRmRGFLUU5TVHZKNXZReUc3SWcAAAAAAAALmpMWQkNwYmVSeGVRaHU2aDFZZExFRjZXZwEWNnFTZEFPM3JReFNRX1dvOEdKT0hndwAA",
    keep_alive: "1m",
  },
};

Query fails with search_phase_execution_exception at onBody Without pit query works fine but it's needed to retrieve more than 10000 hits

2

There are 2 best solutions below

3
On

Well, using PIT in NodeJS ElasticSearch's client is not clear, or at least is not well documented. You can create a PIT using the client like:

const pitRes = await elastic.openPointInTime({
  index: index,
  keep_alive: "1m"
});

pit_id = pitRes.body.id;

But there is no way to use that pit_id in the search method, and it's not documented properly :S

BUT, you can use the scroll API as follows:

const scrollSearch =  await elastic.helpers.scrollSearch({
index: index,
body: {
  "size": 10000,
  "query": {
    "query_string": {
      "fields": [ "vm_ref", "org", "vm" ],
      "query": organization + moreQuery
    },
  "sort": [
    { "utc_date": "desc" }
  ]
  }
}});

And then read the results as follows:

let res = [];

try {
  for await (const result of scrollSearch) {
    res.push(...result.body.hits.hits);
  }
} catch (e) {
  console.log(e);
}

I know that's not the exact answer to your question, but I hope it helps ;)

1
On

The usage of point-in-time for pagination of search results is now documented in ElasticSearch. You can find more or less detailed explanations here: Paginate search results

I prepared an example that may give an idea about how to implement the workflow, described in the documentation:

async function searchWithPointInTime(cluster, index, chunkSize, keepAlive) {
    if (!chunkSize) {
        chunkSize = 5000;
    }
    if (!keepAlive) {
        keepAlive = "1m";
    }

    const client = new Client({ node: cluster });
    let pointInTimeId = null;
    let searchAfter = null;

    try {
        // Open point in time
        pointInTimeId = (await client.openPointInTime({ index, keep_alive: keepAlive })).body.id;

        // Query next chunk of data
        while (true) {
            const size = remained === null ? chunkSize : Math.min(remained, chunkSize);
            const response = await client.search({
                // Pay attention: no index here (because it will come from the point-in-time)
                body: {
                    size: chunkSize,
                    track_total_hits: false, // This will make query faster
                    query: {
                        // (1) TODO: put any filter you need here (instead of match_all)
                        match_all: {},
                    },
                    pit: {
                        id: pointInTimeId,
                        keep_alive: keepAlive,
                    },
                    // Sorting should be by _shard_doc or at least include _shard_doc
                    sort: [{ _shard_doc: "desc" }],
                    // The next parameter is very important - it tells Elastic to bring us next portion
                    ...(searchAfter !== null && { search_after: [searchAfter] }),
                },
            });

            const { hits } = response.body.hits;
            if (!hits || !hits.length) {
                break; // No more data
            }

            for (hit of hits) {
                // (2) TODO: Do whatever you need with results
            }

            // Check if we done reading the data
            if (hits.length < size) {
                break; // We finished reading all data
            }

            // Get next value for the 'search after' position 
            // by extracting the _shard_doc from the sort key of the last hit
            searchAfter = hits[hits.length - 1].sort[0];
        }
    } catch (ex) {
        console.error(ex);
    } finally {
        // Close point in time
        if (pointInTime) {
            await client.closePointInTime({ body: { id: pointInTime } });
        }
    }
}