Why does Synapse spark.read return garbled MongoDB _id values?

174 Views Asked by At

Using a CosmosDBMongoDB linked service, from Azure Cosmos DB for MongoDB, to a Synapse Analytics workspace.

EDIT: I can confirm that the objectID values in the Cosmos/ Mongo DB are valid, since they are used in an application.

Document snippet, as seen in CosmosDB Data Explorer

{
    "_id" : ObjectId("623a3902764504df1bc51620"),
    "name": "Dummy data"
}

The Analytical Storage Time to Live option is On in the CosmosDB collections, meaning they show up in the Linked section in Analytics studio.

In a Synapse notebook running

spark.sql("select unhex('623a3902764504df1bc51620') as bytes").show(truncate=False)

returns

+-------------------------------------+
|bytes                                |
+-------------------------------------+
|[62 3A 39 02 76 45 04 DF 1B C5 16 20]|
+-------------------------------------+

indicating a working PySpark environment. However

df = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
    .option("spark.cosmos.container", "TABLENAME")\
    .load()

display(df.limit(10))

returns

"{"objectId":"b:9\u0002vE\u0004�\u001b�\u0016 "}"
objectId: ""b:9\u0002vE\u0004�\u001b�\u0016 ""

for all values in _id column.

EDIT: df.printSchema() returns

root
 |-- _rid: string (nullable = true)
 |-- _ts: long (nullable = true)
 |-- id: string (nullable = true)
 |-- _etag: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- objectId: string (nullable = true)
 |-- name: struct (nullable = true)
 |    |-- string: string (nullable = true)
..... snip ...

Running df3 = df.select(unhex("_id.objectId")) returns

+-------------------+
|unhex(_id.objectId)|
+-------------------+
|               null|
|               null|
+-------------------+

Running df.select(unhex('_id.objectId')) returns

DataFrame[unhex(_id.objectId): binary]

Running

SELECT TOP (100) JSON_VALUE([_id], '$.objectId') AS _id
 FROM [DB].[dbo].[TABLENAME]

in the built-in SQL pool returns the same garbled b:9vE�� values.

Trying

%%sql
    create table TABLENAME using cosmos.olap options (
        spark.synapse.linkedService 'CosmosDbMongoDb1',
        spark.cosmos.container 'TABLENAME'
)

SELECT name, _id
FROM TABLENAME;

returns for the _id column

"{"schema":[{"name":"objectId","dataType":{},"nulla..."
schema: "[{"name":"objectId","dataType":{},"nullable":true,..."
0: "{"name":"objectId","dataType":{},"nullable":true,"..."
name: ""objectId""
dataType: "{}"
nullable: "true"
metadata: "{"map":{}}"
map: "{}"
values: "["b:9\u0002vE\u0004�\u001b�\u0016 "]"
0: ""b:9\u0002vE\u0004�\u001

Sorry for the copypaste additional garble there are expandable fields in the column values.

2

There are 2 best solutions below

0
On BEST ANSWER

I found Scala code example in Azure documentation that works for converting ObjectId field to string:

val df = spark.read.format("cosmos.olap")
.option("spark.synapse.linkedService", "xxxx")
.option("spark.cosmos.container", "xxxx")
.load()

val convertObjectId = udf((bytes: Array[Byte]) => {
    val builder = new StringBuilder

    for (b <- bytes) {
        builder.append(String.format("%02x", Byte.box(b)))
    }
    builder.toString
}
)

val dfConverted = df.withColumn("objectId", col("_id.objectId")).withColumn("convertedObjectId", convertObjectId(col("_id.objectId"))).select("id", "objectId", "convertedObjectId")

display(dfConverted)
5
On

As per this documentation,

Cosmos DB automatically transforms the BSON data (Binary JSON) into a columnar format.

However, you are still getting an error because of the ObjectId type.

In Spark, there is no ObjectId type, so it doesn't recognize the type and gives an error like this. These are the supported data types in Spark.

To avoid this, you need to create or ingest data with an _id provided, because if you don't provide the _id, it will be automatically added like below: ObjectId("623a3902764504df1bc51620").

Below are two records created: one with an autogenerated _id and another with a manually created _id.

Autogenerated

enter image description here

Manually added

enter image description here

Output: enter image description here

Below is the code used to add data to Cosmos MongoDB for the container HTAP2.


from random import randint
import time
import uuid

client = MongoClient(connection_string)
db = client.samp # get database

orders = db["HTAP2"] #get container

items = ['Pizza','Sandwich','Soup', 'Salad', 'Tacos']
prices = [2.99, 3.49, 5.49, 12.99, 54.49]

for x in range(1, 501):
    random_uuid = uuid.uuid4()
    
    order = {
        '_id' : str(random_uuid),
        'item' : items[randint(0, (len(items)-1))],
        'price' : prices[randint(0, (len(prices)-1))],
        'rating' : randint(1, 5),
        'timestamp' : time.time()
    }
    
    result=orders.insert_one(order)

print('finished creating 500 orders')

Output:

So, make sure to add an _id when adding data to Cosmos DB.

Refer this documentation for more information.