I am working with a CSV dataset as input, read by readStream
as below:
inDF = spark \
.readStream \
.option("sep", ",") \
.option("maxFilesPerTrigger", 1) \
.schema(rawStreamSchema) \
.csv(rawEventsDir)
Below the schema:
inDF schema =
root
|-- timeStamp: timestamp (nullable = true)
|-- machine: string (nullable = true)
|-- module: string (nullable = true)
|-- component: string (nullable = true)
|-- plateID: integer (nullable = true)
|-- measureProgr: integer (nullable = true)
|-- measure: string (nullable = true)
|-- value: double (nullable = true)
I need to make some aggregation, as below:
byMeasureDF = inDF \
.withWatermark('timeStamp', '600 seconds') \
.groupBy(window(inDF.timeStamp, windowSize, windowStart)
, inDF.machine, inDF.module
, inDF.component, inDF.measure) \
.agg(min(inDF.value).alias('minValue')
, max(inDF.value).alias('maxValue')
, avg(inDF.value).alias('avgValue')
, stddev(inDF.value).alias('stdevValue'))
that works, indeed the output schema is correct:
byMeasureDF schema =
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- machine: string (nullable = true)
|-- module: string (nullable = true)
|-- component: string (nullable = true)
|-- measure: string (nullable = true)
|-- minValue: double (nullable = true)
|-- maxValue: double (nullable = true)
|-- avgValue: double (nullable = true)
|-- stdevValue: double (nullable = true)
but, when I run the below query:
q_byMeasure = byMeasureDF \
.writeStream \
.format('csv') \
.option('delimiter', ',') \
.option('header', 'true') \
.outputMode('append') \
.queryName('byMeasure') \
.start(path = confStreamMiningByMeasureDir
, checkpointLocation = chkStreamMiningByMeasureDir)
I get the following error:
Traceback (most recent call last):
File "/home/roberto/BOTT-G80216/Programs/Version_00.01.00/Python/2_fromRawToConformed.py", line 87, in <module>
, checkpointLocation = chkStreamMiningByMeasureDir)
File "/home/roberto/spark/python/pyspark/sql/streaming.py", line 844, in start
return self._sq(self._jwrite.start(path))
File "/home/roberto/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/roberto/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
Notice that if I write the same dataframe on console, it works well.
It loojs like if there were a bug in this Spark release. Does anyone know a possible solution. Thanks a lot in advance