Importing Relational Data into Hadoop with Sqoop and Sqoop2

By Matthew Rathbone on June 29, 2016

mysql hadoop Sqoop Sqoop2 redshift

This article was co-authored by Elena Akhmatova

Icecream Scoop Image by Sarah

I’ve written a lot about hadoop-related technologies and MapReduce frameworks in particular in which I have implemented the same data-querying solution in many MapReduce frameworks. However until this point I haven’t really spoken much about how to get data into Hadoop in the first place.

A common problem is moving data from a relational datastore (like MySQL or Postres) to Hadoop HDFS in order to process it with Hive, MapReduce or Spark.

There’s a great open source piece of software called Sqoop, it provides these capabilities and lets you move data from MySQL, Postgres, Oracle, and Teradata. Just to make things confusing there is also Sqoop2 which has slightly different database version support, but does include SQL Server.

The goal of this article is to walk through a practical data import example with a couple of datasets that need moving from a relational database to HDFS.

The Data

Let’s pretend that I have an e-commerce site that stores:

  1. User information (id, email, language, location)
  2. Transaction information (transaction-id, product-id, user-id, purchase-amount, item-description)

This information is stored in a relational database (MySQL).

I need to export that data to a Hadoop cluster to combine with other datasets, use in MapReduce jobs, and serve as a base for reporting. In fact if you want to see how someone might use this information take a look to my personal blog where I use these exact datasets in my many MapReduce guides:

Why Should I Import Relational Data into Hadoop?

If you’ve started using Hadoop for other data storage and analysis tasks you’ll find that much of this analysis could be improved with access to reliable user or product data. In our example maybe user and transaction records could be used in coordination with website log data to understand purchasing patterns, come up with recommendations, or to drive an inventory supply algorithm.

By building an automated, reliable way to populate Hadoop with relational data, you’ll be one step closer to a rich and fully populated data lake and a world of possibilities.

An Introduction to Sqoop

While there are ways to stream data directly to Hadoop (like log data) common data that resides in a relational database is usually imported using a batch read/write tool, like Sqoop.

Sqoop is a tool designed for efficiently transferring bulk data between Hadoop and structured datastores such as relational databases. Sqoop can be used to import data from a relational database such as MySQL or Oracle into the Hadoop Distributed File System (HDFS). It can also be used to transform data stored in Hadoop as part of a MapReduce and then export the data back into a relational database.

Just to make things confusing, at the moment of writing (2016) there are 2 Sqoops: Sqoop and a newer Sqoop2. Although be warned, some folks also refer to Sqoop2 as simply Sqoop.

Sqoop

Sqoop is a java utility that can be run from the command line, you tell it configuration like the connection path to your database and it sets up a series of MapReduce jobs to extract and process the information.

Sqoop2

Sqoop2 has a client-server architecture and transfers data between more types of sources - structured, semi-structured and unstructured data sources (eg Cassandra, MongoDB).

Citing Cloudera on Sqoop: “Sqoop 2 is essentially the future of the Apache Sqoop project. However, since Sqoop 2 currently lacks some of the features of Sqoop 1, Cloudera recommends you use Sqoop 2 only if it contains all the features required for your use case; otherwise, continue to use Sqoop 1.”

Also of note to readers - the client/server architecture also makes Sqoop 2 more complex to get started with.

In this tutorial I’ll show examples from both versions of Sqoop, but won’t go into too much detail on the varied ways you can interact with Sqoop2. Instead I’ll mostly focus on command-line interaction with both pieces of software.

Setup

Amazon’s Elastic MapReduce (EMR) will have Sqoop preinstalled. Outside of EMR you have to install in yourself, that can be done by downloading Sqoop (or Sqoop2) compatible with your Hadoop version from Sqoop’s web-site. Some Hadoop distributions (like CDH) will also include it by default, check your distribution docs to find out.

To start, you need to add a JDBC library for your database to Sqoop’s lib directory (usually this is due to licensing restrictions). EG: The MySQL JDBC Connector.

As it has a client-server architecture, Sqoop2 requires installing both client and server following the installation guide.

MySQL Database Schema

mysql> describe m_users;
+----------+--------------+------+-----+---------+-------+
| Field    | Type         | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| id       | bigint(20)   | YES  |     | NULL    |       |
| email    | varchar(255) | YES  |     | NULL    |       |
| language | varchar(2)   | YES  |     | NULL    |       |
| location | varchar(2)   | YES  |     | NULL    |       |
+----------+--------------+------+-----+---------+-------+
4 rows in set (0.73 sec)
mysql> describe m_transactions;
+-----------------+------------+------+-----+---------+-------+
| Field           | Type       | Null | Key | Default | Extra |
+-----------------+------------+------+-----+---------+-------+
| id              | bigint(20) | YES  |     | NULL    |       |
| productId       | bigint(20) | YES  |     | NULL    |       |
| userId          | bigint(20) | YES  |     | NULL    |       |
| purchaseAmount  | double     | YES  |     | NULL    |       |
| itemDescription | text       | YES  |     | NULL    |       |
+-----------------+------------+------+-----+---------+-------+
5 rows in set (0.03 sec)

Sqoop Usage

Do not forget to allow your user access to DB from your Hadoop machines. Create a user with the required permissions for access directly from the Hadoop cluster.

There are 2 main options to export data:

  1. Export the entire table
  2. Export the results of a query

And there are a number of ways to run Sqoop:

  1. From the command line
  2. Via a Java API
  3. Via an interactive shell (Sqoop2 only)

Sqoop : Command Line

To collect all of the data from the table:

sqoop import \
  --connect jdbc:mysql://localhost/test \
  --username root \ 
  --table m_users \
  --split-by id \
  -m 1 \
  --target-dir /user/elena/users \ 
  --fields-terminated-by '\t' \
  --lines-terminated-by '\n'
sqoop import \ 
  --connect jdbc:mysql://localhost/test \ 
  --username root \
  --table m_users \
  --split-by id \
  -m 1 \
  --target-dir /user/elena/transactions \ 
  --fields-terminated-by '\t' \
  --lines-terminated-by '\n'\

To preselect what to import with a mysql query:

sqoop import \
  --connect jdbc:mysql://localhost/test \
  --username root \
  --query 'select id, location from m_users WHERE $CONDITIONS' \ 
  -m 1 \
  --split-by id \  
  --target-dir /user/elena/users \ 
  --fields-terminated-by '\t' \
  --lines-terminated-by '\n'
sqoop import \
  --connect jdbc:mysql://localhost/test  \
  --username root \
  --query 'select id, productID, userId from m_transactions WHERE $CONDITIONS' \
  -m 1 \
  --split-by id \  
  --target-dir /user/elena/transactions \ 
  --fields-terminated-by '\t' \
  --lines-terminated-by '\n'

In this example, substitute $CONDITIONS with some sort of table filter.

Most variables are obvious, but there are a couple of non-obvious flags here:

  • -m 1is responsible for the number of Map tasks your job uses. This controls the speed with which data will be copied, plus the number of resulting chunks.
  • --split-by defines the table column by which the resulting files should be broken up. This by default is the primary key (and probably should be usually, but you can specify it directly).
  • --fields-terminated-by & --lines-terminated-by define the field separators for the resulting file. By default if you’re going to use hive then tab \t and newline \n are the best options here.

Sqoop2 : Shell

While Sqoop2 performs the same operations as Sqoop, the way you interact with it is different. Instead of simply running a command line task with flags you can communicate with Sqoop2 with an interactive shell.

# start Sqoop server on some machine
sqoop.sh server start

# start Sqoop client
sqoop.sh client

Sqoop Shell: Type 'help' or '\h' for help.
sqoop:000> 

Establishing a database connection:

sqoop:000> create connection --cid 1
Creating connection for connector with id 1
Please fill following values to create new connection object
Name: firstcon 

Connection configuration

JDBC Driver Class: com.mysql.jdbc.Driver
JDBC Connection String: jdbc:mysql://localhost/test
Username: root
Password: 
JDBC Connection Properties: 
There are currently 0 values in the map:
entry# 

Security related configuration options

Max connections: 0
New connection was successfully created with validation status FINE and persistent id 4

Register job to import data:

sqoop:000> create job --xid 1 --type import
Creating job for connection with id 1
Please fill following values to create new job object
Name: firstjob

Database configuration

Schema name: 
Table name: m_users
Table SQL statement: 
Table column names: 
Partition column name: id
Nulls in partition column: 
Boundary query: 

Output configuration

Storage type: 
  0 : HDFS
Choose: 0
Output format: 
  0 : TEXT_FILE
  1 : SEQUENCE_FILE
Choose: 0
Compression format: 
  0 : NONE
  1 : DEFAULT
  2 : DEFLATE
  3 : GZIP
  4 : BZIP2
  5 : LZO
  6 : LZ4
  7 : SNAPPY
Choose: 0
Output directory: /user/elena/firstjob

Throttling resources

Extractors: 
Loaders: 
New job was successfully created with validation status FINE  and persistent id 4

Run the job:

sqoop:000> start job --jid 4
Submission details
Job ID: 4
Server URL: http://localhost:12000/sqoop/
Created by: elena
Creation date: 2016-04-08 14:23:11 MSK
Lastly updated by: elena
External ID: job_local1204996321_0001
  http://localhost:8080/
2016-04-08 14:23:11 MSK: BOOTING  - Progress is not available

So the interactive shell makes Sqoop2 a little easier to get started with from the client side as you don’t need to remember a set of esotoric command line flags, but it’s certainly more work to have to fill-in all the details as prompted.

The nice thing about the command line client is that you can specify your configuration in a .sqoop file and have the client run that directly. This file is literally just a list of Sqoop client commands, so you’ll have to learn those first.

That said, I think it’s still a whole lot more complex than original Sqoop, especially for newcomers – if you’re new to Hadoop it’s not clear the best way to get started with these tools.

Sqoop : Java API

This is the API for original Sqoop (not Sqoop2). The options resemble the command line options almost 1-1.

public class DataImporter {
  
  public static void main(String[] args){
    System.out.println(export());
  }
  
  public static int export(){
    SqoopOptions options = new SqoopOptions();
    options.setConnectString("jdbc:mysql://localhost/test");
    
    // options.setTableName("TABLE_NAME");
    // options.setWhereClause("id>10");
   
    options.setUsername("root");
    options.setPassword("");
    options.setNumMappers(2); 
    options.setSqlQuery("select id, location from m_users WHERE $CONDITIONS");
    options.setSplitByCol("id");
    
    Configuration config = new Configuration(); 
    config.addResource(new Path("/Users/elena/apache/hadoop-0.20.2/conf/core-site.xml"));
    config.addResource(new Path("/Users/elena/apache/hadoop-0.20.2/conf/hdfs-site.xml"));
    
    options.setConf(config);
    options.setTargetDir("users_java");
    options.setHadoopHome("/Users/elena/apache/hadoop-0.20.2");
    options.setExplicitDelims(true);
    options.setFieldsTerminatedBy('\t');
    options.setLinesTerminatedBy('\n');
    int ret = new ImportTool().run(options);
    return ret;
  }
}

The code is very self-explanatory.

Sqoop2 : Java API

This code is a little more involved and abstracts many of the common database concepts behind ‘forms’.

public class DataImporter {
  public static void main(String[] args) {
    String connectionString = "jdbc:mysql://localhost/test";
    String username = "root";
    String password = "";
    String schemaName = "";
    String tableName = "m_users";
    String columns = "id,email,language,location";
    String partitionColumn = "id";
    String outputDirectory = "/user/elena/javasqoop2";
    String url = "http://localhost:12000/sqoop/";
    
    SqoopClient client = new SqoopClient(url);
    MConnection newCon = client.newConnection(1);

    // Get connection and framework forms. Set name for connection
    MConnectionForms conForms = newCon.getConnectorPart();
    MConnectionForms frameworkForms = newCon.getFrameworkPart();
    newCon.setName("MyConnection");

    // Set connection forms values
    conForms.getStringInput("connection.connectionString").setValue(connectionString);
    conForms.getStringInput("connection.jdbcDriver").setValue("com.mysql.jdbc.Driver");
    conForms.getStringInput("connection.username").setValue(username);
    conForms.getStringInput("connection.password").setValue(password);

    frameworkForms.getIntegerInput("security.maxConnections").setValue(Integer.valueOf(0));

    Status status  = client.createConnection(newCon);
    if(status.canProceed()) {
     System.out.println("Created. New Connection ID : " +newCon.getPersistenceId());
    } else {
     System.out.println("Check for status and forms error ");
    }

    //Creating job object
    MJob newjob = client.newJob(newCon.getPersistenceId(), org.apache.sqoop.model.MJob.Type.IMPORT);
    MJobForms connectorForm = newjob.getConnectorPart();
    MJobForms frameworkForm = newjob.getFrameworkPart();

    newjob.setName("ImportJob");

    //Database configuration
    connectorForm.getStringInput("table.schemaName").setValue(schemaName);

    //Input either table name or sql
    connectorForm.getStringInput("table.tableName").setValue(tableName);

    // Other options
    //connectorForm.getStringInput("table.sql").setValue("select id,name from table where ${CONDITIONS}");
    //connectorForm.getStringInput("table.columns").setValue(columns);
   
    connectorForm.getStringInput("table.partitionColumn").setValue(partitionColumn);
    
    //Output configurations
    frameworkForm.getEnumInput("output.storageType").setValue("HDFS");
    frameworkForm.getEnumInput("output.outputFormat").setValue("TEXT_FILE");
    frameworkForm.getStringInput("output.outputDirectory").setValue(outputDirectory);

    //Creating the job
    status = client.createJob(newjob);
    if(status.canProceed()) {
     System.out.println("New Job ID: "+ newjob.getPersistenceId());
    } else {
     System.out.println("Check for status and forms error ");
    }
    //Now Submit the Job
    MSubmission submission = client.startSubmission(newjob.getPersistenceId());
    System.out.println("Status : " + submission.getStatus());

  }
}

The Sqoop2 API is a little more involved, but still looks similar to the original Sqoop API. It does still require you to have the server running.

Further Processing

The output from either Sqoop tools should be the data inside a folder on Hadoop. This can then serve as an input for downstream processes with MapReduce or Spark.

In case further processing will be done in Hive, I recommend working with the data as an external table in Hive. That way the data does not have to be controlled by Hive, but is instead mostly read-only.

Sqoop actually comes with some built-in flags for creating Hive tables (--hive-import, --create-hive-table and more), but for some reason does not support creating external tables.

Personally I prefer using external tables for all data that was originally ‘owned’ by a different system – this prevents accidental deletion of data that is slow to import or hard to recover. For users it sets the explicit idea that the data is not owned by hive, which it is not.

Actually making an external table semi-automatically with Sqoop isn’t fun, here’s the best guide I could find, which isn’t great.

You can of course manually create external tables with hive:

CREATE EXTERNAL TABLE users(
  id BIGINT, 
  email STRING, 
  language STRING, 
  loc STRING
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
LOCATION '/user/elena/users';

Wrap-Up

Both Sqoop and Sqoop2 provide DB <> Hadoop data import and export. While Sqoop2 is the ‘future’ of Sqoop it is more complicated and harder to get started with. Frankly I prefer the simpler semantics of Sqoop, but Sqoop2 does come with advantages, such as the centralized coordination of connections and ongoing jobs. Either way I would recommend using Sqoop as simply the means of data transfer, and managing workflows and connections with a robust workflow management framework like Luigi that you can also use for downstream jobs.

The biggest problem I found with both tools is that it’s hard to figure out what documentation to look through, and even after you find the right thing it can be hard to get through. This is a problem generally with Hadoop projects, but the introduction of a second, very different, version of Sqoop makes this doubly onerous, especially as in some places there is no naming distinction made between Sqoop and Sqoop2.

Still Writing SQL in a Terminal?

Beekeeper is a free and full featured SQL IDE with query autocomplete, error detection, dataset previews, and result visualization. We support SparkSQL, Hive, Postgres and more!

Create your account »

Matthew Rathbone bio photo

Matthew Rathbone

CEO of Beekeeper Data. British. Data nerd.

Big Data Blog Email Twitter Github Stackoverflow