SpringXD: Can partitionPath and hdfs-dataset coexist?

359 Views Asked by At

I defined several streams, using the new partitionPath option so that files end up in per-day directories in Hadoop:

stream create --name XXXX --definition "http --port=8300|hdfs-dataset --format=avro --idleTimeout=100000 --partitionPath=dateFormat('yyyy/MM/dd/')" --deploy

stream create --name YYYY --definition "http --port=8301|hdfs --idleTimeout=100000 --partitionPath=dateFormat('yyyy/MM/dd/')" --deploy

All of the streams were created and deployed, except for XXXX up there:

17:42:49,102  INFO Deployer server.StreamDeploymentListener - Deploying stream Stream{name='XXXX'}
17:42:50,948  INFO Deployer server.StreamDeploymentListener - Deployment status for stream 'XXXX': DeploymentStatus{state=failed,error(s)=java.lang.IllegalArgumentException: Cannot instantiate 'IntegrationConfigurationInitializer': org.springframework.integration.jmx.config.JmxIntegrationConfigurationInitializer}
17:42:50,951  INFO Deployer server.StreamDeploymentListener - Stream Stream{name='XXXX'} deployment attempt complete

Note that its data gets processed and deposited in avro format. And FWIW, where the other streams get put in /xd/<NAME>/<rest of path>, using the hdfs-dataset --format=avro combo results in files going to /xd/<NAME>/string

I re-defined it w/o the partitionPath option, and the stream deployed.

Do we have a bug here, or am I doing something wrong?

1

There are 1 best solutions below

0
On

The hdfs-dataset sink is intended for writing serialized POJOs to HDFS. We use the Kite SDK kite-data functionality for this, so take a look at that project for some additional info.

The partitioning expressions for hdfs and hdfs-dataset are different. The hdfs-dataset follows the Kite SDK syntax and you need to specify a field of the POJO where your partition value is stored. For a timestamp (long) field the expression would look like this: dateFormat('timestamp', 'YM', 'yyyyMM') where timestamp is the name of the field, 'YM' is the prefix that gets added to the directory for the partition like YM201411 and 'yyyyMM' is the format you want for the partition value. If you want a year/mont/day directory structure for the partition you could use year('timestamp')/month('timestamp')/day('timestamp'). There is some more coverage in the Kite SDK Partitioned Datasets docs.

For your example it doesn't make much sense to add partitioning since you are persisting a simple String value. If you do add a processor to transform the data to a POJO then partitioning makes more sense and we have some examples in the XD docs.