Write data from hadoop file into dbf using pyspark is time consuming

48 Views Asked by At

We've requirement to query the data in our hadoop (hive) and save it into a dbf file. To achieve that we use spark as our processing engine especially pyspark (python 3.4). We use dbf package as package dbf writer, https://pypi.org/project/dbf/ After several testing, we notice the process is take a lot of time, sometimes reach 20 minutes. It's not as fast as we write into another file format like csv, orc, etc.

Base syntax (around 20 minutes)

import dbf
from datetime import datetime

collections = spark.sql("SELECT JENISKEGIA, JUMLAHUM_A, ... , URUTAN, WEIGHT FROM silastik.sakernas_2022_8").collect()

filename2="/home/sak202208_"+str(datetime.now())+"_tes.dbf"

header2 = "JENISKEGIA N(8,0); JUMLAHUM_A N(8,0); ... , URUTAN N(7,0); WEIGHT N(8,0)"

new_table2 = dbf.Table(filename2, header2)

new_table2.open(dbf.READ_WRITE)

for row in collections:
    new_table2.append(row)

new_table2.close

Enabling multithreading (similar result)

import dbf
from datetime import datetime

collections = spark.sql("SELECT JENISKEGIA, JUMLAHUM_A, ... , URUTAN, WEIGHT FROM silastik.sakernas_2022_8").collect()

filename2="/home/sak202208_"+str(datetime.now())+"_tes.dbf"

header2 = "JENISKEGIA N(8,0); JUMLAHUM_A N(8,0); ... , URUTAN N(7,0); WEIGHT N(8,0)"

new_table2 = dbf.Table(filename2, header2)

new_table2.open(dbf.READ_WRITE)

def append_row(table, record):
    table.append(record)

with concurrent.futures.ThreadPoolExecutor(max_workers=min(32, (os.cpu_count() or 1) + 4)) as executor:
    for row in collections:
        executor.submit(append_row(new_table2, row))

new_table2.close

The spark driver memory have been set to 7GB but when we check using top command, it's just use around 1GB when writing into the dbf file

How we can write the data into dbf file efficiently? Is there any tuning that we miss or any alternative?

1

There are 1 best solutions below

3
Ethan Furman On

There are two main causes for the slowness:

  1. Every record must be converted between Python data types and the dbf storage data types; and
  2. The dbf file must be adjusted with every record (new row written, metadata about dbf written, etc.).

One speedup is to create all the rows at once (if you know how many rows there will be), and then replace each row with actual data:

new_table2.open(dbf.READ_WRITE)

new_table2.append(multiple=<number_of_rows>)

for rec, row in zip(new_table2, collections):
    dbf.write(rec, **row)

Note that each row must be a mapping for this to work.