Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
432 views
in Technique[技术] by (71.8m points)

hadoop - A way to read table data from Mysql to Pig

Everyone know that Pig have supported DBStorage, but they are only supported load results from Pig to mysql like that

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');

But Please show me the way to read table from mysql like that

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');

Here is my code

public class DBLoader extends LoadFunc {
    private final Log log = LogFactory.getLog(getClass());
    private ArrayList mProtoTuple = null;
    private Connection con;
    private String jdbcURL;
    private String user;
    private String pass;
    private int batchSize;
    private int count = 0;
    private String query;
    ResultSet result;
    protected TupleFactory mTupleFactory = TupleFactory.getInstance();

    public DBLoader() {
    }

    public DBLoader(String driver, String jdbcURL, String user, String pass,
            String query) {

        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            log.error("can't load DB driver:" + driver, e);
            throw new RuntimeException("Can't load DB Driver", e);
        }
        this.jdbcURL = jdbcURL;
        this.user = user;
        this.pass = pass;
        this.query = query;

    }

    @Override
    public InputFormat getInputFormat() throws IOException {
        // TODO Auto-generated method stub
        return new TextInputFormat();
    }

    @Override
    public Tuple getNext() throws IOException {
        // TODO Auto-generated method stub
        boolean next = false;

        try {
            next = result.next();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        if (!next)
            return null;
        int numColumns = 0;
        // Get result set meta data
        ResultSetMetaData rsmd;
        try {
            rsmd = result.getMetaData();
            numColumns = rsmd.getColumnCount();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (int i = 0; i < numColumns; i++) {

            try {
                Object field = result.getObject(i);

                switch (DataType.findType(field)) {
                case DataType.NULL:

                    mProtoTuple.add(null);

                    break;

                case DataType.BOOLEAN:
                    mProtoTuple.add((Boolean) field);

                    break;

                case DataType.INTEGER:
                    mProtoTuple.add((Integer) field);

                    break;

                case DataType.LONG:
                    mProtoTuple.add((Long) field);

                    break;

                case DataType.FLOAT:
                    mProtoTuple.add((Float) field);

                    break;

                case DataType.DOUBLE:
                    mProtoTuple.add((Double) field);

                    break;

                case DataType.BYTEARRAY:
                    byte[] b = ((DataByteArray) field).get();
                    mProtoTuple.add(b);

                    break;
                case DataType.CHARARRAY:
                    mProtoTuple.add((String) field);

                    break;
                case DataType.BYTE:
                    mProtoTuple.add((Byte) field);

                    break;

                case DataType.MAP:
                case DataType.TUPLE:
                case DataType.BAG:
                    throw new RuntimeException("Cannot store a non-flat tuple "
                            + "using DbStorage");

                default:
                    throw new RuntimeException("Unknown datatype "
                            + DataType.findType(field));

                }

            } catch (Exception ee) {
                throw new RuntimeException(ee);
            }
        }

        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;

    }

    @Override
    public void prepareToRead(RecordReader arg0, PigSplit arg1)
            throws IOException {

        con = null;
        if (query == null) {
            throw new IOException("SQL Insert command not specified");
        }
        try {
            if (user == null || pass == null) {
                con = DriverManager.getConnection(jdbcURL);
            } else {
                con = DriverManager.getConnection(jdbcURL, user, pass);
            }
            con.setAutoCommit(false);
            result = con.createStatement().executeQuery(query);
        } catch (SQLException e) {
            log.error("Unable to connect to JDBC @" + jdbcURL);
            throw new IOException("JDBC Error", e);
        }
        count = 0;
    }

    @Override
    public void setLocation(String location, Job job) throws IOException {
        // TODO Auto-generated method stub

        //TextInputFormat.setInputPaths(job, location);

    }

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{

        @Override
        public RecordReader<NullWritable, NullWritable> createRecordReader(
                InputSplit arg0, TaskAttemptContext arg1) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

        @Override
        public List<InputSplit> getSplits(JobContext arg0) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
            return null;
        }

    }

}

I try many times to write UDF but not success.....

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Like you say, DBStorage only supports saving results to a database.

To load data from MySQL you could look into a project called sqoop (that copies data from a database to HDFS), or you could perform a mysql dump and then copy the file into HDFS. Both ways required some interaction and cannot be directly used from inside Pig.

A third option would be to look into writing a Pig LoadFunc (you say your tried to write a UDF). It shouldn't be too difficult, you'll need to pass much the same options as DBStorage (driver, connection credentials and a SQL query to execute), and you can probably use some result set metadata inspection to auto generate a schema too.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...