I am trying to write data to alluxio using map reduce. I have around 11 gig of data on hdfs which I am writing to alluxio.It is working fine with MUST_CACHE write type (the default value of alluxio.user.file.writetype.default).
But when I am trying to write it using CACHE_THROUGH, it is failing with following exception:
Error: alluxio.exception.status.UnavailableException: Channel to <hostname of one of the worker>:29999: <underfs path to file> (No such file or directory)
at alluxio.client.block.stream.NettyPacketWriter.close(NettyPacketWriter.java:263)
at com.google.common.io.Closer.close(Closer.java:206)
at alluxio.client.block.stream.BlockOutStream.close(BlockOutStream.java:166)
at alluxio.client.file.FileOutStream.close(FileOutStream.java:137)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:111)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.close(MapTask.java:679)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:802)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:346)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1595)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: alluxio.exception.status.NotFoundException: Channel to <hostname of one of the worker>29999: <underfs path to file> (No such file or directory)
at alluxio.exception.status.AlluxioStatusException.from(AlluxioStatusException.java:153)
at alluxio.util.CommonUtils.unwrapResponseFrom(CommonUtils.java:548)
at alluxio.client.block.stream.NettyPacketWriter$PacketWriteHandler.channelRead(NettyPacketWriter.java:367)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:748)
I tried with below command as well, getting same error:
./alluxio fs -Dalluxio.user.file.writetype.default=CACHE_THROUGH copyFromLocal <hdfs_input_path> <alluxio_output_path>
Any help/pointers will be appreciated. Thanks
The
copyFromLocal
shell command is only able to copy files available on the local filesystem. To copy a file out of HDFS and into Alluxio, you can first copy the file to your local machine, then write the file to Alluxio.To write directly from mapreduce to Alluxio, update your
core-site.xml
to contain, add the Alluxio client jar to your application classpath with
-libjars /path/to/client
, and write to analluxio://master_hostname:19998/alluxio_output_path
URI. See the documentation for further details.