package org.apache.zeppelin.tajo;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tajo interpreter for Zeppelin.
*/
public class TajoInterpreter extends Interpreter {
private Logger logger = LoggerFactory.getLogger(TajoInterpreter.class);
private Connection connection;
private Statement statement;
private Exception exceptionOnConnect;
public static final String TAJO_JDBC_URI = "tajo.jdbc.uri";
public static final String TAJO_DRIVER_NAME = "org.apache.tajo.jdbc.TajoDriver";
static {
Interpreter.register(
"tql",
"tajo",
TajoInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(TAJO_JDBC_URI, "jdbc:tajo://localhost:26002/default", "The URL for TajoServer.")
.build());
}
public TajoInterpreter(Properties property) {
super(property);
}
public Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(getProperty(TAJO_JDBC_URI));
}
@Override
public void open() {
logger.info("Jdbc open connection called!");
try {
Class.forName(TAJO_DRIVER_NAME);
} catch (ClassNotFoundException e) {
logger.error("Can not open connection", e);
exceptionOnConnect = e;
return;
}
try {
connection = getJdbcConnection();
exceptionOnConnect = null;
logger.info("Successfully created connection");
}
catch (SQLException e) {
logger.error("Cannot open connection", e);
exceptionOnConnect = e;
}
}
@Override
public void close() {
try {
if (connection != null) {
connection.close();
}
}
catch (SQLException e) {
logger.error("Cannot close connection", e);
}
finally {
connection = null;
exceptionOnConnect = null;
}
}
private InterpreterResult executeSql(String sql) {
try {
if (exceptionOnConnect != null) {
return new InterpreterResult(Code.ERROR, exceptionOnConnect.getMessage());
}
statement = connection.createStatement();
StringBuilder msg = null;
if (StringUtils.containsIgnoreCase(sql, "EXPLAIN ")) {
//return the explain as text, make this visual explain later
msg = new StringBuilder();
}
else {
msg = new StringBuilder("%table ");
}
ResultSet res = statement.executeQuery(sql);
try {
ResultSetMetaData md = res.getMetaData();
for (int i = 1; i < md.getColumnCount() + 1; i++) {
if (i == 1) {
msg.append(md.getColumnName(i));
} else {
msg.append("\t" + md.getColumnName(i));
}
}
msg.append("\n");
while (res.next()) {
for (int i = 1; i < md.getColumnCount() + 1; i++) {
msg.append(res.getString(i) + "\t");
}
msg.append("\n");
}
}
finally {
try {
res.close();
statement.close();
}
finally {
statement = null;
}
}
InterpreterResult interpreterResult = new InterpreterResult(Code.SUCCESS, msg.toString());
return interpreterResult;
}
catch (SQLException ex) {
logger.error("Can not run " + sql, ex);
return new InterpreterResult(Code.ERROR, ex.getMessage());
}
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run SQL command '" + cmd + "'");
return executeSql(cmd);
}
@Override
public void cancel(InterpreterContext context) {
if (statement != null) {
try {
statement.cancel();
}
catch (SQLException ex) {
}
finally {
statement = null;
}
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
TajoInterpreter.class.getName() + this.hashCode());
}
@Override
public List<String> completion(String buf, int cursor) {
return null;
}
}