SETUP:
Spark: 3.2.3
DeltaSharing Test-Server running locally
I am writing and reading data into a Deltalake with Spark.
Now I like to enable CDF for being able to read only the changes permanently with using DeltaSharing. DeltaSharing without CDF works.
The loaded packages:
conf.set('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension')
conf.set('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog')
Then I register the S3-path to a table:
spark.sql(f"CREATE TABLE if not exists retail USING DELTA LOCATION '{table_path}'")
spark.sql(f"ALTER TABLE retail SET TBLPROPERTIES(delta.enableChangeDataFeed = true)")
Then after creating new data in a DataFrame and I append this data to the deltalake file.
df.write.option("header", True).mode("append").format("delta").save(table_path))
When I read the data with a separate job directly from the delta lake it works:
table_path = f"s3a://{'BUCKET']}{s3folder}/retail"
df = (spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "29")
.load(table_path))
When using DeltaSharing it fails:
table_path = DS_PROFILE_FILE + '#' + 'PM.catalog.transactions'
df = (spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("startingVersion", "29")
.load(table_path))
with:
: io.delta.sharing.spark.util.UnexpectedHttpStatus: HTTP request failed with status: HTTP/1.1 400 Bad Request {"errorCode":"INVALID_PARAMETER_VALUE","message":"cdf is not enabled on table PM.catalog.transactions"}.
I am run out of ideas. Any suggestion?
I finally found the solution in a delta.io blog
You need to tell the server configuration of delta-sharing as well that this table is CDF enabled: