pandas udf into column in array type

92 Views Asked by At

my assignment is to store the following into an array type column:

def sample_udf(df:SparkDataFrame):
  device_issues = []
  if (df['altitude'] == 0):      
    return "alt" 
  elif (df['latitude'] <= -90
      or df['latitude'] >=90):      
    return "gps_lat"
  elif (df['longitude'] <= -180
      or df['longitude'] >= 180):      
    return "gps_long"
  elif (df['direction'] < 0
      or df['direction'] > 359):      
    return "gps_direction"
  else:
    return device_issues

df_new = df.withColumn("deviceIssues", sample_udf(f.col("altitude"), f.col("latitude")))

when I run that cmd, I got this error:

TypeError: anomaly_detections() takes 1 positional argument but 2 were given

any help will be appreciated

I'm expecting that the column "deviceIssues" will be in arraytype column.

1

There are 1 best solutions below

2
On

Try:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def sample_udf(altitude, longitude, latitude, direction):
    device_issues = []
    if altitude == 0:
        device_issues.append('alt')
    if (latitude <= -90) or (latitude >=90):
        device_issues.append('gps_lat')
    if (longitude <= -180) or (longitude >= 180):
        device_issues.append('gps_lon')
    if (direction < 0) or (direction > 359):
        device_issues.append('gps_direction')
    return device_issues

# the output is an ArrayType of StringType
f_sample_udf = udf(lambda *p: sample_udf(*p), ArrayType(StringType()))

df_new = df.withColumn('deviceIssues', f_sample_udf('altitude', 'longitude', 'latitude', 'direction'))

Output:

>>> df_new.show()
+--------+---------+--------+---------+--------------------+
|altitude|longitude|latitude|direction|        deviceIssues|
+--------+---------+--------+---------+--------------------+
|       0|       10|      10|      380|[alt, gps_direction]|
|      20|      187|      20|       20|           [gps_lon]|
+--------+---------+--------+---------+--------------------+