writestream aggregate windowed watermarked dataframe doesn't wok:

370 Views Asked by At

I am working with a CSV dataset as input, read by readStream as below:

inDF = spark \
    .readStream \
    .option("sep", ",") \
    .option("maxFilesPerTrigger", 1) \
    .schema(rawStreamSchema) \
    .csv(rawEventsDir)

Below the schema:

inDF schema = 
root
 |-- timeStamp: timestamp (nullable = true)
 |-- machine: string (nullable = true)
 |-- module: string (nullable = true)
 |-- component: string (nullable = true)
 |-- plateID: integer (nullable = true)
 |-- measureProgr: integer (nullable = true)
 |-- measure: string (nullable = true)
 |-- value: double (nullable = true)

I need to make some aggregation, as below:

byMeasureDF = inDF \
        .withWatermark('timeStamp', '600 seconds') \
        .groupBy(window(inDF.timeStamp, windowSize, windowStart)
                 , inDF.machine, inDF.module
                 , inDF.component, inDF.measure) \
        .agg(min(inDF.value).alias('minValue')
             , max(inDF.value).alias('maxValue')
             , avg(inDF.value).alias('avgValue')
             , stddev(inDF.value).alias('stdevValue'))

that works, indeed the output schema is correct:

byMeasureDF schema = 
root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- machine: string (nullable = true)
 |-- module: string (nullable = true)
 |-- component: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- minValue: double (nullable = true)
 |-- maxValue: double (nullable = true)
 |-- avgValue: double (nullable = true)
 |-- stdevValue: double (nullable = true)

but, when I run the below query:

q_byMeasure = byMeasureDF \
          .writeStream \
          .format('csv') \
          .option('delimiter', ',') \
          .option('header', 'true') \
          .outputMode('append') \
          .queryName('byMeasure') \
          .start(path = confStreamMiningByMeasureDir
                 , checkpointLocation = chkStreamMiningByMeasureDir)

I get the following error:

Traceback (most recent call last):
  File "/home/roberto/BOTT-G80216/Programs/Version_00.01.00/Python/2_fromRawToConformed.py", line 87, in <module>
    , checkpointLocation = chkStreamMiningByMeasureDir)
  File "/home/roberto/spark/python/pyspark/sql/streaming.py", line 844, in start
    return self._sq(self._jwrite.start(path))
  File "/home/roberto/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/roberto/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

Notice that if I write the same dataframe on console, it works well.

It loojs like if there were a bug in this Spark release. Does anyone know a possible solution. Thanks a lot in advance

0

There are 0 best solutions below