How to use Mongo DB addFields aggregation pipeline in Spark.read?

137 Views Asked by At

I am trying to use the Mongo DB aggregation pipeline in a Synapse Spark notebook. The use case is to convert ObjectID type fields, like the _id field, to string with $addFields.

However, my attempts are met with

IllegalArgumentException: Unrecognized configuration specified: (pipeline,[{ $addFields: { _id: { '$toString': '$_id' } } } ])

I have been trying to brute force quote mark combinations, looking at examples from Copilot and documentation. Here is one example:

pipeline = "[{ '$addFields': { '_id': { '$toString': $_id } } } ]"

df = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
    .option("spark.cosmos.container", "<COLLECTION NAME>")\
    .option("pipeline", pipeline)\
    .load()

display(df.limit(10))

Did I make some noob mistake in literal format, or is this a case of missing support in the Spark connector?

EDIT: This paragraph in the connector docs could very well point to an answer for someone more experienced with MongoDB.

Custom aggregation pipelines must be compatible with the partitioner > strategy. For example, aggregation stages such as $group do not work > with any partitioner that creates more than one partition.

2

There are 2 best solutions below

0
On

I found Scala code example in Azure documentation that works for converting ObjectId field to string:

val df = spark.read.format("cosmos.olap")
.option("spark.synapse.linkedService", "xxxx")
.option("spark.cosmos.container", "xxxx")
.load()

val convertObjectId = udf((bytes: Array[Byte]) => {
    val builder = new StringBuilder

    for (b <- bytes) {
        builder.append(String.format("%02x", Byte.box(b)))
    }
    builder.toString
}
)

val dfConverted = df.withColumn("objectId", col("_id.objectId")).withColumn("convertedObjectId", convertObjectId(col("_id.objectId"))).select("id", "objectId", "convertedObjectId")

display(dfConverted)

This solution also answers the question here

4
On

To convert ObjectID type fields, like the _id field, to a string, follow the approach below:

  1. Define the Schema for the data that will be read from Cosmos OLAP.

  2. Use the binary type in the schema for the ObjectID field.

schema = StructType([
    StructField("_rid", StringType(), True),
    StructField("_ts", LongType(), True),
    StructField("id", StringType(), True),
    StructField("_etag", StringType(), True),
    StructField("_id", StructType([
        StructField("objectId", BinaryType(), True)  # Use BinaryType for objectId
    ]), True),
    StructField("name", StructType([
        StructField("string", StringType(), True)
    ]), True),
    StructField("age", StructType([
        StructField("int32", IntegerType(), True)
    ]), True),
    StructField("marks", StructType([
        StructField("array", ArrayType(StructType([
            StructField("int32", IntegerType(), True)
        ])), True)
    ]), True),
    StructField("newAge", StructType([
        StructField("string", StringType(), True)
    ]), True),
    StructField("_partitionKey", StructType([
        StructField("string", StringType(), True)
    ]), True)
])
  1. convert_object_id() takes a byte array as input and converts it into a hexadecimal string.
def convert_object_id(bytes_array):
    builder = []
    for b in bytes_array:
        builder.append(format(b, '02x'))
    return ''.join(builder)
  1. The line below registers the UDF with Spark, which is used in Spark DataFrames.
convert_object_id_udf = udf(convert_object_id)
  1. The data is read from Cosmos OLAP using the schema, and the data is read into a DataFrame.
df = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
    .option("spark.cosmos.container", "secondColl")\
    .schema(schema)\
    .load()
  1. dfConverted creates a new DataFrame by adding objectId and convertedObjectId.
dfConverted = (
     df.withColumn("objectId", col("_id.objectId"))
    .withColumn("convertedObjectId", convert_object_id_udf(col("_id.objectId")))
    .select("id", "objectId", "convertedObjectId")
)

Code I tried with:

from pyspark.sql.functions import udf, col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, ArrayType, BinaryType

schema = StructType([
    StructField("_rid", StringType(), True),
    StructField("_ts", LongType(), True),
    StructField("id", StringType(), True),
    StructField("_etag", StringType(), True),
    StructField("_id", StructType([
        StructField("objectId", BinaryType(), True)  # Use BinaryType for objectId
    ]), True),
    StructField("name", StructType([
        StructField("string", StringType(), True)
    ]), True),
    StructField("age", StructType([
        StructField("int32", IntegerType(), True)
    ]), True),
    StructField("marks", StructType([
        StructField("array", ArrayType(StructType([
            StructField("int32", IntegerType(), True)
        ])), True)
    ]), True),
    StructField("newAge", StructType([
        StructField("string", StringType(), True)
    ]), True),
    StructField("_partitionKey", StructType([
        StructField("string", StringType(), True)
    ]), True)
])

def convert_object_id(bytes_array):
    builder = []
    for b in bytes_array:
        builder.append(format(b, '02x'))
    return ''.join(builder)
 
convert_object_id_udf = udf(convert_object_id)
 
df = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
    .option("spark.cosmos.container", "secondColl")\
    .schema(schema)\
    .load()

dfConverted = (
     df.withColumn("objectId", col("_id.objectId"))
    .withColumn("convertedObjectId", convert_object_id_udf(col("_id.objectId")))
    .select("id", "objectId", "convertedObjectId")
)
 
display(dfConverted)

Output:

enter image description here