Stop Thinking, Just Do!

Sungsoo Kim's Blog

Zeppelin Tajo Interpreter

tagsTags

17 February 2016


Zeppelin Tajo Interpreter

package org.apache.zeppelin.tajo;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Tajo interpreter for Zeppelin.
 */
public class TajoInterpreter extends Interpreter {
  private Logger logger = LoggerFactory.getLogger(TajoInterpreter.class);

  private Connection connection;
  private Statement statement;
  private Exception exceptionOnConnect;

  public static final String TAJO_JDBC_URI = "tajo.jdbc.uri";
  public static final String TAJO_DRIVER_NAME = "org.apache.tajo.jdbc.TajoDriver";

  static {
    Interpreter.register(
      "tql",
      "tajo",
      TajoInterpreter.class.getName(),
      new InterpreterPropertyBuilder()
        .add(TAJO_JDBC_URI, "jdbc:tajo://localhost:26002/default", "The URL for TajoServer.")
        .build());
  }

  public TajoInterpreter(Properties property) {
    super(property);
  }

  public Connection getJdbcConnection() throws SQLException {
    return DriverManager.getConnection(getProperty(TAJO_JDBC_URI));
  }

  @Override
  public void open() {
    logger.info("Jdbc open connection called!");
    try {
      Class.forName(TAJO_DRIVER_NAME);
    } catch (ClassNotFoundException e) {
      logger.error("Can not open connection", e);
      exceptionOnConnect = e;
      return;
    }
    try {
      connection = getJdbcConnection();
      exceptionOnConnect = null;
      logger.info("Successfully created connection");
    }
    catch (SQLException e) {
      logger.error("Cannot open connection", e);
      exceptionOnConnect = e;
    }
  }

  @Override
  public void close() {
    try {
      if (connection != null) {
        connection.close();
      }
    }
    catch (SQLException e) {
      logger.error("Cannot close connection", e);
    }
    finally {
      connection = null;
      exceptionOnConnect = null;
    }
  }

  private InterpreterResult executeSql(String sql) {
    try {
      if (exceptionOnConnect != null) {
        return new InterpreterResult(Code.ERROR, exceptionOnConnect.getMessage());
      }
      statement = connection.createStatement();
      StringBuilder msg = null;
      if (StringUtils.containsIgnoreCase(sql, "EXPLAIN ")) {
        //return the explain as text, make this visual explain later
        msg = new StringBuilder();
      }
      else {
        msg = new StringBuilder("%table ");
      }

      ResultSet res = statement.executeQuery(sql);
      try {
        ResultSetMetaData md = res.getMetaData();
        for (int i = 1; i < md.getColumnCount() + 1; i++) {
          if (i == 1) {
            msg.append(md.getColumnName(i));
          } else {
            msg.append("\t" + md.getColumnName(i));
          }
        }
        msg.append("\n");
        while (res.next()) {
          for (int i = 1; i < md.getColumnCount() + 1; i++) {
            msg.append(res.getString(i) + "\t");
          }
          msg.append("\n");
        }
      }
      finally {
        try {
          res.close();
          statement.close();
        }
        finally {
          statement = null;
        }
      }

      InterpreterResult interpreterResult = new InterpreterResult(Code.SUCCESS, msg.toString());
      return interpreterResult;
    }
    catch (SQLException ex) {
      logger.error("Can not run " + sql, ex);
      return new InterpreterResult(Code.ERROR, ex.getMessage());
    }
  }

  @Override
  public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
    logger.info("Run SQL command '" + cmd + "'");
    return executeSql(cmd);
  }

  @Override
  public void cancel(InterpreterContext context) {
    if (statement != null) {
      try {
        statement.cancel();
      }
      catch (SQLException ex) {
      }
      finally {
        statement = null;
      }
    }
  }

  @Override
  public FormType getFormType() {
    return FormType.SIMPLE;
  }

  @Override
  public int getProgress(InterpreterContext context) {
    return 0;
  }

  @Override
  public Scheduler getScheduler() {
    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
      TajoInterpreter.class.getName() + this.hashCode());
  }

  @Override
  public List<String> completion(String buf, int cursor) {
    return null;
  }
}

comments powered by Disqus