I have created a UDTF and I'm running below java hive JDBC code inside it to execute a hive query and get results.
I'm able to get the connection to the hive2 server successfully but the code hangs indefinitely without any exception at statement.executeQuery(). What could be the reason? The same code runs in a standalone eclipse class but hands when deployed in a hadoop cluster as udtf.
public class DynamicWhereUDTF extends GenericUDTF {
private PrimitiveObjectInspector stringOI = null;
ArrayList<Object[]> results = new ArrayList<Object[]>();
@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
stringOI = (PrimitiveObjectInspector) args[0];
if (stringOI != null) {
String name = stringOI.toString();
System.out.println("param <-------> " + name);
}
List<String> fieldNames = new ArrayList<String>();
try {
fieldNames = getColumnNames("d_drug");
} catch (SQLException e) {
e.printStackTrace();
}
System.out.println("fieldNames size ---> " + fieldNames.size());
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
for (int i = 0; i < fieldNames.size(); i++) {
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
}
System.out.println("----------ObjectInspectorFactory created------------ ");
Connection conn = null;
ResultSet rs = null;
PreparedStatement statement = null;
try {
System.out.println("Processing records 1");
Class.forName("org.apache.hive.jdbc.HiveDriver");
System.out.println("Processing records 2");
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab("[email protected]", "/tmp/abc.keytab");
System.out.println("Processing records 3");
String hiveJdbcUrl = "jdbc:hive2://<host>:10000/demo_db;principal=hive/<host>@CS.MSD";
conn = DriverManager.getConnection(hiveJdbcUrl, "abc", "");
System.out.println("conn1 <-------> " + conn);
statement = conn.prepareStatement("select * from xyz limit 5");
System.out.println(" statement ----------> " + statement);
rs = statement.executeQuery();
System.out.println(" resultset ----------> " + rs);
ResultSetMetaData rsMetaData = rs.getMetaData();
int columnCount = rsMetaData.getColumnCount();
System.out.println("columnCount ---> " + columnCount);
// ArrayList<Object[]> results = new ArrayList<Object[]>();
StringBuilder values = new StringBuilder();
while (rs.next()) {
values = new StringBuilder();
for (int i = 0; i < columnCount; i++) {
values = values.append(rs.getString(i + 1)).append(",");
}
String output = values.toString().substring(0,
values.lastIndexOf(","));
System.out.println("output -----> " + output);
results.add(new Object[] {"122556", "52905"});
}
System.out.println("------- results forwarded -------");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (conn != null)
try {
conn.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldOIs);
}
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
@Override
public void process(Object[] record) throws HiveException {
try {
Iterator<Object[]> it = results.iterator();
while (it.hasNext()) {
Object[] r = it.next();
forward(r);
}
System.out.println("------- results forwarded -------");
} catch (Exception ex) {
ex.printStackTrace();
}
}
public List<String> getColumnNames(String tableName) throws SQLException {
List<String> fieldNames = new ArrayList<String>();
fieldNames.add("drug_id");
fieldNames.add("drug_cd");
return fieldNames;
}
}
The problem might be in creating connection in
initializemethod. Try to create connection inconfiguremethod, you can check Hbase connector as an example.