I have a script in Pyspark to read a folder with .json archives and put in the registers data collected using a TMDB's API, but an error is going on and I don't know how to resolve it.
My script is in Portuguese because I'm Brazilian.
The error:
Traceback (most recent call last):
File "/home/gwillye/Documentos/Lab Ex/dadosProdutora.py", line 41, in \<module\>
dados_json = dados_json.withColumn("produtora", col("produtora").cast(StringType()))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/gwillye/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 5170, in withColumn
File "/home/gwillye/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/home/gwillye/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 185, in deco
pyspark.errors.exceptions.captured.AnalysisException: \[UNRESOLVED_COLUMN.WITH_SUGGESTION\] A column or function parameter with name `produtora` cannot be resolved. Did you mean one of the following? \[`genero`, `id`, `profissao`, `notaMedia`, `numeroVotos`\].;
'Project \[\_corrupt_record#8, anoFalecimento#9, anoLancamento#10L, anoNascimento#11, anoTermino#12, genero#13, generoArtista#14, id#15, nomeArtista#16, notaMedia#17, numeroVotos#18L, personagem#19, profissao#20, tempoMinutos#21, tituloOriginal#22, tituloPincipal#23, titulosMaisConhecidos#24, cast('produtora as string) AS produtora#42\]
\+- Relation \[\_corrupt_record#8,anoFalecimento#9,anoLancamento#10L,anoNascimento#11,anoTermino#12,genero#13,generoArtista#14,id#15,nomeArtista#16,notaMedia#17,numeroVotos#18L,personagem#19,profissao#20,tempoMinutos#21,tituloOriginal#22,tituloPincipal#23,titulosMaisConhecidos#24\] json'''
My example about the .json files:
\[
{
"id": "tt0061797",
"tituloPincipal": "The Madcap Island",
"tituloOriginal": "Hyokkori hy\\u00f4tan-jima",
"anoLancamento": 1967.0,
"tempoMinutos": 61.0,
"genero": "Animation",
"notaMedia": 5.6,
"numeroVotos": 10,
"generoArtista": "actress",
"personagem": null,
"nomeArtista": "Chinatsu Nakayama",
"anoNascimento": 1948.0,
"anoFalecimento": null,
"profissao": "actress,soundtrack,music_department",
"titulosMaisConhecidos": "tt1734449,tt0204339,tt0202407,tt0081881",
"produtora": null
},
{
"id": "tt0061797",
"tituloPincipal": "The Madcap Island",
"tituloOriginal": "Hyokkori hy\\u00f4tan-jima",
"anoLancamento": 1967.0,
"tempoMinutos": 61.0,
"genero": "Animation",
"notaMedia": 5.6,
"numeroVotos": 10,
"generoArtista": "actor",
"personagem": null,
"nomeArtista": "Arihiro Fujimura",
"anoNascimento": 1934.0,
"anoFalecimento": 1982.0,
"profissao": "actor,soundtrack",
"titulosMaisConhecidos": "tt0060926,tt0102587,tt0298290,tt0062224",
"produtora": null
},
... \]
My script is:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import requests
def get_tmdb_data(movie_id):
api_key = 'f905807b2900febaccfb008c16388168'
url = f'https://api.themoviedb.org/3/movie/{movie_id}?api_key={api_key}&language=pt-BR'
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
return None
def get_produtora_udf(movie_id):
tmdb_data = get_tmdb_data(movie_id)
if tmdb_data and 'production_companies' in tmdb_data:
produtoras = tmdb_data['production_companies']
if produtoras:
for produtora in produtoras:
if 'name' in produtora:
return produtora['name']
return None
def processar_registro(row):
movie_id = row['id']
produtora_atual = row['produtora']
if produtora_atual is None:
nova_produtora = get_produtora_udf(movie_id)
return row.asDict() | {'produtora': nova_produtora}
else:
return row
if __name__ == "__main__":
spark = SparkSession.builder.appName("ExemploPyspark").getOrCreate()
# Leitura dos arquivos JSON na pasta 'JSON/'
dados_json = spark.read.json("JSON/")
# Adiciona a coluna 'produtora' com valor null
dados_json = dados_json.withColumn("produtora", col("produtora").cast(StringType()))
# Aplica a função processar_registro para substituir 'null' nas colunas 'produtora'
dados_processados = dados_json.rdd.map(processar_registro).toDF()
# Exibe o DataFrame resultante
dados_processados.show(truncate=False)
# Encerra a sessão Spark
spark.stop()