Failed to use map reduce with riak

391 Views Asked by At

I am new for Riak, so please excuse my missunderstanding.

I am able to add new entries and perform queries by key and index. However I have to implement more complex queries, so I am trying to use MapReduce.

I have my application level entity named Volume that for now has only plain fields:

public class Volume implements Comparable<Volume>, Serializable {
    @RiakIndex(name = "id")
    @JsonProperty("id")
    private Integer id;

    @RiakIndex(name = "name")
    @RiakKey
    private String name;

    @RiakIndex(name = "created_at")
    @JsonProperty("created_at")
    private long createdAt;


    // setters, getters....
}

Here is how I add Volume instances to Riak DB:

IRiakClient riakClient = RiakFactory.httpClient();
Bucket bucket = riakClient.fetchBucket(bucketName).execute();
for (int i = 0; i < n; i++) {
    int id = i;
    ManagedVolume volume = new ManagedVolume();
    volume.setCreatedAt(System.currentTimeMillis());
    volume.setId(id);
    volume.setName("volume" + i);
    bucket.store(volume).execute();
}

Now I can retrieve the instances without any problem like following.

Collection<String> col = backet.fetchIndex(IntIndex.named("id")).from(3).to(5).execute();

But all attempts to use MapReduce fail:

String str = riakClient.mapReduce(bucketName, "name: volume1")
    .addMapPhase(new NamedJSFunction("Riak.mapValuesJson")).
    execute().getResultRaw();

I tried to do it without adding Riak.mapValuesJson, tried to modify query to use id instead of name and to wrap volume`` with quotes ("name: \"volume\"","name: \'volume\'"etc.) but nothing helps. I always get HTTP status 500 and the following error: {"error":"map_reduce_error"}`

Here us the stack trace:

Exception in thread "main" com.basho.riak.client.RiakException: java.io.IOException: {"error":"map_reduce_error"}
    at com.basho.riak.client.query.MapReduce.execute(MapReduce.java:81)
    at com.infinidat.riak.TryRiak.search(TryRiak.java:288)
    at com.infinidat.riak.TryRiak.main(TryRiak.java:66)
Caused by: java.io.IOException: {"error":"map_reduce_error"}
    at com.basho.riak.client.raw.http.ConversionUtil.convert(ConversionUtil.java:589)
    at com.basho.riak.client.raw.http.HTTPClientAdapter.mapReduce(HTTPClientAdapter.java:386)
    at com.basho.riak.client.query.MapReduce.execute(MapReduce.java:79)
    ... 2 more

I found the following record in Riak's error.log and similar record in console.log.

2013-07-23 19:14:12.451 [error] <0.194.0> Supervisor riak_pipe_builder_sup had child undefined started with {riak_pipe_builder,start_link,undefined} at <0.18058.4> exit with reason {{modfun,riak_search,mapred_search,[<<"VolumeBucket">>,<<"name: 1">>]},error,badarg,[{ets,lookup,[schema_table,<<"VolumeBucket">>],[]},{riak_search_config,get_schema,1,[{file,"src/riak_search_config.erl"},{line,69}]},{riak_search_client,parse_query,3,[{file,"src/riak_search_client.erl"},{line,40}]},{riak_search,parse_query,3,[{file,"src/riak_search.erl"},{line,59}]},{riak_search,mapred_search,3,[{file,"src/riak_search.erl"},{line,46}]},{riak_kv_mrc_pipe,send_inputs,3,[{file,"src/riak_kv_mrc..."},...]},...]} in context child_terminated

I believe that there is something I am missing here. Probably it is configuration issue? This is a very simple query. I obviously want to move forward to more complicated queries once this one works.

1

There are 1 best solutions below

0
On

Your mapreduce job specifies a Riak Search query as input, and this will fail if you do not have Riak Search enabled in your cluster. There are some examples available in the Java client documentation that shows how different types of input can be specified.

Having said that, Riak MapReduce was not designed to be a real-time query tool, so I am not sure it is the right tool for what you are trying to accomplish. Compared to a direct key-value lookup it adds significantly more load to the system as a large number of nodes/partitions need to participate in each request. This leads to higher latencies and also means it tends to not scale as well as direct key access.

When data modelling for Riak and other key-value stores, data access patterns and query patterns need to be considered up front together with the structure of the data, which is quite different compared to working with relational models. Some blog posts and presentations related to data modelling in Riak are available here: