Pyspark: how to improve spatial intersection?

1k Views Asked by At

I am working with pyspark on Databriks where I have a table of data points that looks like the following

pingsGeo.show(5)
+--------------------+--------------------+----------+--------------------+
|                  ID|               point|      date|            distance|
+--------------------+--------------------+----------+--------------------+
|00007436cf7f96cb1...|POINT (-82.640937...|2020-03-19|0.022844737780675896|
|00007436cf7f96cb1...|POINT (-82.641281...|2020-03-19|3.946137920280456...|
|00007436cf7f96cb1...|POINT (-82.650238...|2020-03-19| 0.00951798692682881|
|00007436cf7f96cb1...|POINT (-82.650947...|2020-03-19|7.503617154519347E-4|
|00007436cf7f96cb1...|POINT (-82.655853...|2020-03-19|0.007148426134394903|
+--------------------+--------------------+----------+--------------------+

root
 |-- ID: string (nullable = true)
 |-- point: geometry (nullable = false)
 |-- date: date (nullable = true)
 |-- distance: double (nullable = false)

And another table of polygons (from shapefile)

zoneShapes.show(5)
+--------+--------------------+
|COUNTYNS|            geometry|
+--------+--------------------+
|01026336|POLYGON ((-78.901...|
|01025844|POLYGON ((-80.497...|
|01074088|POLYGON ((-81.686...|
|01213687|POLYGON ((-76.813...|
|01384015|POLYGON ((-95.152...|

I would like to assign to each point a COUNTYNS

I am doing it with geospark functions. I am doing the following:

queryOverlap = """
        SELECT p.ID, z.COUNTYNS as zone,  p.date, p.point, p.distance
        FROM pingsGeo as p, zoneShapes as z
        WHERE ST_Intersects(p.point, z.geometry))
    """

spark.sql(queryOverlap).show(5)

This query works in a small dataset but it fails for a larger one.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 117 in stage 51.0 failed 4 times, most recent failure: Lost task 117.3 in stage 51.0 (TID 4879, 10.17.21.12, executor 13): org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0

I am wondering if there is a way to optimize the process.

1

There are 1 best solutions below

0
On

Your question is a little vague but here is what I would look at to start with..

There are a few things to consider: 1. physical resouces available to your spark cluster 2. partitioning of your tables - if not partitioned correctly, you might be doing data shuffles that are bigger than the default size

Also, consider using indexes on your biggest table.