Pyspark rdd Transpose

1.2k Views Asked by At

I have the following emp table in hive testing database

1       ram     2000.0  101     market
2       shyam   3000.0  102     IT
3       sam     4000.0  103     finance
4       remo    1000.0  103     finance

I want to transpose this table in pyspark with first two columns being same and last 3 columns being stacked.

I have done the following in pyspark shell

test = sqlContext.sql("select * from testing.emp")

data = test.flatMap (lambda row: [Row (id=row ['id'],name=row['name'],column_name=col,column_val=row [col]) for col in ('sal','dno','dname')])

emp = sqlContext.createDataFrame(data)

emp.registerTempTable('mytempTable')

sqlContext.sql('create table testing.test(id int,name string,column_name string,column_val int) row format delimited fields terminated by ","')

sqlContext.sql('INSERT INTO TABlE testing.test select * from mytempTable')

the expected output is

1   ram sal 2000
1   ram dno 101
1   ram dname   market
2   shyam   sal 3000
2   shyam   dno 102
2   shyam   dname   IT
3   sam sal 4000
3   sam dno 103
3   sam dname   finance
4   remo    sal 1000
4   remo    dno 103
4   remo    dname   finance

But the output I get is

NULL    2000.0  1       NULL
NULL    NULL    1       NULL
NULL    NULL    1       NULL
NULL    3000.0  2       NULL
NULL    NULL    2       NULL
NULL    NULL    2       NULL
NULL    4000.0  3       NULL
NULL    NULL    3       NULL
NULL    NULL    3       NULL
NULL    1000.0  4       NULL
NULL    NULL    4       NULL
NULL    NULL    4       NULL

Also please let me know how I can loop columns if I have many columns in the table

1

There are 1 best solutions below

7
On BEST ANSWER

Sorry I just notic "hive table "

cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).enableHiveSupport().getOrCreate()

df = spark.table("default.test").cache()
cols =  df.columns[2:5]
df = df.rdd.map(lambda x: Row(id=x[0], name=x[1], val=dict(zip(cols, x[2:5]))))
df = spark.createDataFrame(df)
df.createOrReplaceTempView('mytempTable')
sql = """
select
id,
name,
explode(val) AS (k,v)
from mytempTable
"""
df = spark.sql(sql)
df.show()

And in HIVE :

    > desc test;
OK
id                      string                                      
somebody                string                                      
sal                     string                                      
dno                     string                                      
dname                   string                                      
dt                      string                                      

# Partition Information      
# col_name              data_type               comment             

dt                      string  

P.S. You can only use sql without Spark as :

select
  a.id,
  a.somebody,
  b.k,
  b.v
from (
  select
    id,
    somebody,
    map('sal',sal,
    'dno',dno,
    'dname',dname) as val
  from default.test
) a
lateral VIEW explode(val) b as k,v

For your question about small parquet files:

cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.enableHiveSupport().config(conf=cfg).getOrCreate()

df = spark.sparkContext.parallelize(range(26))
df = df.map(lambda x: (x, chr(x + 97), '2017-01-12'))
df = spark.createDataFrame(df, schema=['idx', 'val', 'dt']).coalesce(1)

df.write.saveAsTable('default.testing', mode='overwrite', partitionBy='dt', format='parquet')

small parquet files amount = DataFrame partitions amount

You can use df.coalesce or df.repartition to decrease DataFrame partitions amount

But I am not sure whether there is a hidden trouble that reduce DataFrame partitions to only one (e.g.: OOM?)

And there is another way to combine small files with out spark,just use HIVE sql:

set mapred.reduce.tasks=1;
insert overwrite table default.yourtable partition (dt='2017-01-13')
select col from tmp.yourtable where dt='2017-01-13';