Impala built-in function not available when migrating from Impala to SparkSQL

738 Views Asked by At

I am using a built-in function in Impala like:

select id, parse_url(my_table.url, "QUERY", "extensionId") from my_table

Now I am migrating to SparkSQL (using pyspark in Jupyter Notebook):

my_table.select(my_table.id.cast('string'), parse_url(my_table.url.cast('string'), "QUERY", "extensionId")).show()

However, I got the following error:

NameError: name 'parse_url' is not defined

Also tried below:

my_table.registerTempTable("my_table")

sqlContext.sql("select id, url, parse_url(url, 'QUERY', 'extensionId') as new_url from my_table").show(100)

But all the new_url becomes null.

Any idea what I missed here? Also, how would people handle such problem? Thanks!

1

There are 1 best solutions below

0
On BEST ANSWER

Some missing parts:

  • You cannot execute Impala functions with Spark.
  • There is a Hive UDF with the same name and syntax which can be used with Spark but it has no native implementation and function wrapper. This is why it can be invoked with SQL using HiveContext / SparkSession with Hive support.

In general it should work just fine:

spark.sql("""SELECT parse_url(
    'http://example.com?extensionId=foo', 'QUERY', 'extensionId'
)""").show()
+-----------------------------------------------------------------+
|parse_url(http://example.com?extensionId=foo, QUERY, extensionId)|
+-----------------------------------------------------------------+
|                                                              foo|
+-----------------------------------------------------------------+

and NULL output means that given part cannot be matched:

spark.sql("""SELECT parse_url(
    'http://example.com?bar=foo', 'QUERY', 'extensionId'
)""").show()
+---------------------------------------------------------+
|parse_url(http://example.com?bar=foo, QUERY, extensionId)|
+---------------------------------------------------------+
|                                                     null|
+---------------------------------------------------------+

You could achieve a similar result using an UDF but it will be significantly slower.

from typing import Dict
from urllib.parse import parse_qsl, urlsplit
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, MapType

def parse_args(col: str) -> Dict[str, str]:
    """
    http://stackoverflow.com/a/21584580/6910411
    """
    try:
        return dict(parse_qsl(urlsplit(col).query))
    except:
        pass

parse_args_ = udf(parse_args, MapType(StringType(), StringType()))

With data defined as:

df = sc.parallelize([
    ("http://example.com?bar=foo", ),
    ("http://example.com?extensionId=foo", ),
]).toDF(["url"])

it could be used as follows:

df.select(parse_args_("url")["extensionId"]).show()

with result being:

+----------------------------+
|parse_args(url)[extensionId]|
+----------------------------+
|                        null|
|                         foo|
+----------------------------+