Unable to execute mtermvectors elasticsearch query from AWS EMR cluster using Spark

128 Views Asked by At

I am trying to execute this elasticsearch query via spark:

    POST /aa6/_mtermvectors  
    {
      "ids": [
        "ABC",
        "XYA",
        "RTE"
      ],
      "parameters": {
        "fields": [
          "attribute"
        ],
        "term_statistics": true,
        "offsets": false,
        "payloads": false,
        "positions": false
      }
    }

The code that I have written in Zeppelin is :

def createString():String = {
    return s"""_mtermvectors {
  "ids": [
    "ABC",
    "XYA",
    "RTE"
  ],
  "parameters": {
    "fields": [
      "attribute"
    ],
    "term_statistics": true,
    "offsets": false,
    "payloads": false,
    "positions": false
    }
  }"""
}

import org.elasticsearch.spark._
sc.esRDD("aa6", "?q="+createString).count   

I get the error :

org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: org.elasticsearch.hadoop.rest.EsHadoopRemoteException: parse_exception: parse_exception: Encountered " <RANGE_GOOP> "["RTE","XYA","ABC" "" at line 1, column 22. Was expecting: "TO" ...

{"query":{"query_string":{"query":"_mtermvectors {\"ids\": [\"RTE\",\"ABC\",\"XYA\"], \"parameters\": {\"fields\": [\"attribute\"], \"term_statistics\": true, \"offsets\": false, \"payloads\": false, \"positions\": false } }"}}}
    at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:408)

This is probably something simple but I am unable to find a way to set the request body while making the spark call

1

There are 1 best solutions below

0
On

I'm not sure but I don't think this is currently supported with es-Spark package. You can check this link to see what options are available via sparkContext of esRDD.

What you could do instead is to make use of High Level Rest Client of Elasticsearch and get the details in a List or Seq or any file and then load that into Spark RDD.

It is round the world way but unfortunately that is the only way I suppose. Just so it helps, I have created the below snippet so you at least have the required data from Elasticsearch related to the above query.

import org.apache.http.HttpHost
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.MultiTermVectorsRequest
import org.elasticsearch.client.core.TermVectorsRequest
import org.elasticsearch.client.core.TermVectorsResponse

object SampleSparkES {
  
  /**
   * Main Class where program starts
   */
  def main(args: Array[String]) = {

     val termVectorsResponse = elasticRestClient
  
     println(termVectorsResponse.size)
  
   }
  
  /**
   * Scala client code to retrieve the response of mtermVectors 
   */
  def elasticRestClient : java.util.List[TermVectorsResponse] = {
    
    val client = new RestHighLevelClient(
                        RestClient.builder(
                            new HttpHost("localhost", 9200, "http")))    
    
    val tvRequestTemplate = new TermVectorsRequest("aa6","ids"); 
    tvRequestTemplate.setFields("attribute");
    
    //Set the document ids you want for collecting the term Vector information
    val ids = Array("1", "2", "3");
    val request = new MultiTermVectorsRequest(ids, tvRequestTemplate); 
    val response = client.mtermvectors(request, RequestOptions.DEFAULT)
    
    //get the response
    val termVectorsResponse = response.getTermVectorsResponses   
    
    //close RestHighLevelClient
    client.close();    
    
    //return  List[TermVectorsResponse]
    termVectorsResponse
  }
}

As an example you can get the sumDocFreq of the first document in below manner

println(termVectorsResponse.iterator.next.getTermVectorsList.iterator.next.getFieldStatistics.getSumDocFreq)

All you would now need is to find a way to convert the collection into a Seq in a way that could be loaded in an RDD.