Sqoop-1.4.4安装配置及基本使用

Sqoop是Apache旗下的开源项目,能够在Hadoop和结构化存储系统之间进行数据传输(导入、导出),尤其是当下应用非常广泛的关系型数据库。通常,可能很多业务数据都存储在关系型数据库中,当数据规模达到一定程度后,使用关系型数据对数据进行分析可能会存在一定的瓶颈,如上亿级别记录的复杂统计分析等。将关系型数据库中的数据同步到Hadoop平台上,借助Hadoop平台的可扩展的优势,可以进行复杂的统计分析,处理效率会有一定程度的提升。
下面,我们通过安装配置Sqoop,来体验一下Sqoop在Hadoop和MySQL之间进行数据同步的特性。

准备和配置

我们在使用的主机及其应用进程部署情况,如下所示:

  • 节点m1(10.95.3.56):Sqoop-1.4.4,Hive-0.12.0,Namenode、JobTracker
  • MySQL节点(10.95.3.49):MySQL数据库

我们先验证Sqoop能够成功连接MySQL数据库,然后验证将MySQL数据库表中的数据,同步到Hive中。
首先,在CentOS 6.4下安装MySQL数据库(服务器IP为:10.95.3.49):

rpm -qa | grep mysql
sudo rpm -e --nodeps mysql
yum list | grep mysql
sudo yum install -y mysql-server mysql mysql-deve
rpm -qi mysql-server
sudo service mysqld start

然后,在主机m1上安装最新版本的Sqoop,版本为1.4.4,直接解压缩即可:

tar xvzf sqoop-1.4.4.bin__hadoop-1.0.0.tar.gz
cd sqoop-1.4.4.bin__hadoop-1.0.0

验证Sqoop连接MySQL数据库

为了简单验证Sqoop能够连接MySQL数据库,可以做如下一些准备工作。
建库建表:

CREATE DATABASE workflow;
CREATE TABLE `workflow`.`project` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(100) NOT NULL,
      `type` tinyint(4) NOT NULL DEFAULT '0',
      `description` varchar(500) DEFAULT NULL,
      `create_at` date DEFAULT NULL,
      `update_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      `status` tinyint(4) NOT NULL DEFAULT '0',
      PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

数据库访问授权:

grant all on workflow.* to shirdrn@'m1' identified by 'shiyanjun';
flush privileges;

将MySQL的JDBC驱动程序库mysql-connector-java-5.1.29.jar加入到sqoop-1.4.4.bin__hadoop-1.0.0/lib目录下。
现在,我们想在使用Sqoop连接的MySQL数据库,数据库名称为workflow,执行如下命令:

bin/sqoop list-tables --connect jdbc:mysql://10.95.3.49:3306/workflow --username shirdrn --password shiyanjun

可以看到列出了workflow库中所有的表。

从MySQL导入到Hive

首先,保证Hadoop集群安装配置正确,并且已经安装配置好了Hive,具体安装配置可以参考其他文档。
在测试MySQL数据库workflow中表project中插入几条测试记录:

INSERT INTO workflow.project(name, type, description, create_at, status) VALUES('Avatar-I', 1, 'Avatar-I project', '2014-02-11', 0);
INSERT INTO workflow.project(name, type, description, create_at, status) VALUES('Avatar-II', 1, 'Avatar-II project', '2014-02-12', 0);
INSERT INTO workflow.project(name, type, description, create_at, status) VALUES('Avatar-III', 2, 'Avatar-III project', '2014-02-13', 0);

查询MySQL的workflow.project表,记录如图所示:
mysql-select-project-table
一共有3条记录,主键id自增长。
我们在m1上安装了Hive,从m1上访问MySQL数据库前面已经授权。然后,在Hive中默认default数据库中创建表project:

CREATE TABLE project (
      id INT,
      name STRING,
      type TINYINT,
      description STRING,
      create_at STRING,
      update_at STRING,
      status TINYINT
);

下面,我们在m1使用Sqoop将MySQL中的workflow.project表导入到Hive中,执行如下命令:

bin/sqoop import --connect jdbc:mysql://10.95.3.49:3306/workflow --table project --username shirdrn -P --hive-import -- --default-character-set=utf-8

输入连接MySQL数据库workflow的密码即可以执行同步。
如果没有报错,输出日志大概是这样的:

14/02/26 18:39:41 INFO tool.BaseSqoopTool: Using Hive-specific delimiters for output. You can override
14/02/26 18:39:41 INFO tool.BaseSqoopTool: delimiters with --fields-terminated-by, etc.
14/02/26 18:39:41 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
14/02/26 18:39:41 INFO tool.CodeGenTool: Beginning code generation
14/02/26 18:39:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `project` AS t LIMIT 1
14/02/26 18:39:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `project` AS t LIMIT 1
14/02/26 18:39:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/shirdrn/cloud/programs/hadoop-1.2.1
Note: /tmp/sqoop-shirdrn/compile/77b6825a31fc6b84d0dbac391d855db6/project.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
14/02/26 18:39:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-shirdrn/compile/77b6825a31fc6b84d0dbac391d855db6/project.jar
14/02/26 18:39:43 WARN manager.MySQLManager: It looks like you are importing from mysql.
14/02/26 18:39:43 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
14/02/26 18:39:43 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
14/02/26 18:39:43 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
14/02/26 18:39:43 INFO mapreduce.ImportJobBase: Beginning import of project
14/02/26 18:39:44 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `project`
14/02/26 18:39:44 INFO mapred.JobClient: Running job: job_201402260008_0030
14/02/26 18:39:45 INFO mapred.JobClient:  map 0% reduce 0%
14/02/26 18:39:57 INFO mapred.JobClient:  map 33% reduce 0%
14/02/26 18:40:00 INFO mapred.JobClient:  map 66% reduce 0%
14/02/26 18:40:03 INFO mapred.JobClient:  map 100% reduce 0%
14/02/26 18:40:05 INFO mapred.JobClient: Job complete: job_201402260008_0030
14/02/26 18:40:05 INFO mapred.JobClient: Counters: 18
14/02/26 18:40:05 INFO mapred.JobClient:   Job Counters
14/02/26 18:40:05 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=25816
14/02/26 18:40:05 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/02/26 18:40:05 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/02/26 18:40:05 INFO mapred.JobClient:     Launched map tasks=3
14/02/26 18:40:05 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
14/02/26 18:40:05 INFO mapred.JobClient:   File Output Format Counters
14/02/26 18:40:05 INFO mapred.JobClient:     Bytes Written=201
14/02/26 18:40:05 INFO mapred.JobClient:   FileSystemCounters
14/02/26 18:40:05 INFO mapred.JobClient:     HDFS_BYTES_READ=295
14/02/26 18:40:05 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=221850
14/02/26 18:40:05 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=201
14/02/26 18:40:05 INFO mapred.JobClient:   File Input Format Counters
14/02/26 18:40:05 INFO mapred.JobClient:     Bytes Read=0
14/02/26 18:40:05 INFO mapred.JobClient:   Map-Reduce Framework
14/02/26 18:40:05 INFO mapred.JobClient:     Map input records=3
14/02/26 18:40:05 INFO mapred.JobClient:     Physical memory (bytes) snapshot=268840960
14/02/26 18:40:05 INFO mapred.JobClient:     Spilled Records=0
14/02/26 18:40:05 INFO mapred.JobClient:     CPU time spent (ms)=1900
14/02/26 18:40:05 INFO mapred.JobClient:     Total committed heap usage (bytes)=89063424
14/02/26 18:40:05 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=2139516928
14/02/26 18:40:05 INFO mapred.JobClient:     Map output records=3
14/02/26 18:40:05 INFO mapred.JobClient:     SPLIT_RAW_BYTES=295
14/02/26 18:40:05 INFO mapreduce.ImportJobBase: Transferred 201 bytes in 21.406 seconds (9.3899 bytes/sec)
14/02/26 18:40:05 INFO mapreduce.ImportJobBase: Retrieved 3 records.
14/02/26 18:40:05 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `project` AS t LIMIT 1
14/02/26 18:40:05 WARN hive.TableDefWriter: Column create_at had to be cast to a less precise type in Hive
14/02/26 18:40:05 WARN hive.TableDefWriter: Column update_at had to be cast to a less precise type in Hive
14/02/26 18:40:05 INFO hive.HiveImport: Removing temporary files from import process: hdfs://m1:9000/user/shirdrn/project/_logs
14/02/26 18:40:05 INFO hive.HiveImport: Loading uploaded data into Hive
14/02/26 18:40:06 INFO hive.HiveImport:
14/02/26 18:40:06 INFO hive.HiveImport: Logging initialized using configuration in jar:file:/home/shirdrn/cloud/programs/hive-0.12.0-bin/lib/hive-common-0.12.0.jar!/hive-log4j.properties
14/02/26 18:40:12 INFO hive.HiveImport: OK
14/02/26 18:40:12 INFO hive.HiveImport: Time taken: 5.336 seconds
14/02/26 18:40:12 INFO hive.HiveImport: Loading data to table default.project
14/02/26 18:40:13 INFO hive.HiveImport: Table default.project stats: [num_partitions: 0, num_files: 4, num_rows: 0, total_size: 201, raw_data_size: 0]
14/02/26 18:40:13 INFO hive.HiveImport: OK
14/02/26 18:40:13 INFO hive.HiveImport: Time taken: 0.698 seconds
14/02/26 18:40:13 INFO hive.HiveImport: Hive import complete.
14/02/26 18:40:13 INFO hive.HiveImport: Export directory is empty, removing it.

然后可以在Hive中查询:

SELECT * FROM project;

结果示例,如图所示:
hive-select-project-table
可见,已经将MySQL中的project表同步到了Hive中,后续可以在Hive中对数据进行查询分析操作。

从MySQL增量导入到Hive

我们可能希望有一个定时脚本程序,每次脚本程序执行的时候,能够将MySQL数据库中增量修改的数据同步到Hive中,这时可以使用Sqoop的增量同步选项。前面我们已经导入一部分数据,现在,我们再新INSERT几条记录,然后使用Sqoop增量同步数据到Hive。在MySQL的workflow.project表中INSERT如下几条记录:

INSERT INTO workflow.project(name, type, description, create_at, status) VALUES('BestPop', 1, 'BestPop project', '2014-02-22', 0);
INSERT INTO workflow.project(name, type, description, create_at, status) VALUES('BuddyGoGoGo', 3, 'BuddyGoGoGo project', '2014-02-23', 0);
INSERT INTO workflow.project(name, type, description, create_at, status) VALUES('Zebbbbbo', 1, 'Zebbbbbo project', '2014-02-25', 0);

查询一下MySQL记录,如图所示:
mysql-select-project-table-increment
在原来的基础上,增加了3条,id是自增的。
然后执行如下命令进行增量同步:

bin/sqoop import --connect jdbc:mysql://10.95.3.49:3306/workflow --table project --username shirdrn -P --hive-import --incremental append --check-column id --last-value 3 -- --default-character-set=utf-8 

执行结果,日志示例如下:

14/02/26 18:41:36 INFO tool.BaseSqoopTool: Using Hive-specific delimiters for output. You can override
14/02/26 18:41:36 INFO tool.BaseSqoopTool: delimiters with --fields-terminated-by, etc.
14/02/26 18:41:36 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
14/02/26 18:41:36 INFO tool.CodeGenTool: Beginning code generation
14/02/26 18:41:37 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `project` AS t LIMIT 1
14/02/26 18:41:37 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `project` AS t LIMIT 1
14/02/26 18:41:37 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/shirdrn/cloud/programs/hadoop-1.2.1
Note: /tmp/sqoop-shirdrn/compile/9923852cc1f496e87c82d113f32a7258/project.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
14/02/26 18:41:38 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-shirdrn/compile/9923852cc1f496e87c82d113f32a7258/project.jar
14/02/26 18:41:38 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM project
14/02/26 18:41:38 INFO tool.ImportTool: Incremental import based on column `id`
14/02/26 18:41:38 INFO tool.ImportTool: Lower bound value: 3
14/02/26 18:41:38 INFO tool.ImportTool: Upper bound value: 6
14/02/26 18:41:38 WARN manager.MySQLManager: It looks like you are importing from mysql.
14/02/26 18:41:38 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
14/02/26 18:41:38 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
14/02/26 18:41:38 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
14/02/26 18:41:38 INFO mapreduce.ImportJobBase: Beginning import of project
14/02/26 18:41:40 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `project` WHERE ( `id` > 3 AND `id` <= 6 )
14/02/26 18:41:40 INFO mapred.JobClient: Running job: job_201402260008_0031
14/02/26 18:41:41 INFO mapred.JobClient:  map 0% reduce 0%
14/02/26 18:41:53 INFO mapred.JobClient:  map 33% reduce 0%
14/02/26 18:41:54 INFO mapred.JobClient:  map 66% reduce 0%
14/02/26 18:41:59 INFO mapred.JobClient:  map 100% reduce 0%
14/02/26 18:42:00 INFO mapred.JobClient: Job complete: job_201402260008_0031
14/02/26 18:42:00 INFO mapred.JobClient: Counters: 18
14/02/26 18:42:00 INFO mapred.JobClient:   Job Counters
14/02/26 18:42:00 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=26302
14/02/26 18:42:00 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
14/02/26 18:42:00 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
14/02/26 18:42:00 INFO mapred.JobClient:     Launched map tasks=3
14/02/26 18:42:00 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
14/02/26 18:42:00 INFO mapred.JobClient:   File Output Format Counters
14/02/26 18:42:00 INFO mapred.JobClient:     Bytes Written=199
14/02/26 18:42:00 INFO mapred.JobClient:   FileSystemCounters
14/02/26 18:42:00 INFO mapred.JobClient:     HDFS_BYTES_READ=295
14/02/26 18:42:00 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=222624
14/02/26 18:42:00 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=199
14/02/26 18:42:00 INFO mapred.JobClient:   File Input Format Counters
14/02/26 18:42:00 INFO mapred.JobClient:     Bytes Read=0
14/02/26 18:42:00 INFO mapred.JobClient:   Map-Reduce Framework
14/02/26 18:42:00 INFO mapred.JobClient:     Map input records=3
14/02/26 18:42:00 INFO mapred.JobClient:     Physical memory (bytes) snapshot=265646080
14/02/26 18:42:00 INFO mapred.JobClient:     Spilled Records=0
14/02/26 18:42:00 INFO mapred.JobClient:     CPU time spent (ms)=1810
14/02/26 18:42:00 INFO mapred.JobClient:     Total committed heap usage (bytes)=89063424
14/02/26 18:42:00 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=2139369472
14/02/26 18:42:00 INFO mapred.JobClient:     Map output records=3
14/02/26 18:42:00 INFO mapred.JobClient:     SPLIT_RAW_BYTES=295
14/02/26 18:42:00 INFO mapreduce.ImportJobBase: Transferred 199 bytes in 21.5979 seconds (9.2139 bytes/sec)
14/02/26 18:42:00 INFO mapreduce.ImportJobBase: Retrieved 3 records.
14/02/26 18:42:00 INFO util.AppendUtils: Creating missing output directory - project
14/02/26 18:42:00 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `project` AS t LIMIT 1
14/02/26 18:42:00 WARN hive.TableDefWriter: Column create_at had to be cast to a less precise type in Hive
14/02/26 18:42:00 WARN hive.TableDefWriter: Column update_at had to be cast to a less precise type in Hive
14/02/26 18:42:00 INFO hive.HiveImport: Removing temporary files from import process: hdfs://m1:9000/user/shirdrn/project/_logs
14/02/26 18:42:00 INFO hive.HiveImport: Loading uploaded data into Hive
14/02/26 18:42:02 INFO hive.HiveImport:
14/02/26 18:42:02 INFO hive.HiveImport: Logging initialized using configuration in jar:file:/home/shirdrn/cloud/programs/hive-0.12.0-bin/lib/hive-common-0.12.0.jar!/hive-log4j.properties
14/02/26 18:42:08 INFO hive.HiveImport: OK
14/02/26 18:42:08 INFO hive.HiveImport: Time taken: 5.342 seconds
14/02/26 18:42:08 INFO hive.HiveImport: Loading data to table default.project
14/02/26 18:42:08 INFO hive.HiveImport: Table default.project stats: [num_partitions: 0, num_files: 7, num_rows: 0, total_size: 400, raw_data_size: 0]
14/02/26 18:42:08 INFO hive.HiveImport: OK
14/02/26 18:42:08 INFO hive.HiveImport: Time taken: 0.666 seconds
14/02/26 18:42:08 INFO hive.HiveImport: Hive import complete.
14/02/26 18:42:08 INFO hive.HiveImport: Export directory is empty, removing it.
14/02/26 18:42:08 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
14/02/26 18:42:08 INFO tool.ImportTool:  --incremental append
14/02/26 18:42:08 INFO tool.ImportTool:   --check-column id
14/02/26 18:42:08 INFO tool.ImportTool:   --last-value 6
14/02/26 18:42:08 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')

Sqoop的--incremental有两种模式,一种是我们上面使用的append模式,根据指定的字段的增量(如关系型数据库的主键)进行同步;另一种lastmodified模式,可以根据指定的更新时间戳字段来进行同步,这种模式用的还是比较多的,如对关系数据库全表的变化每天给出全局报表等,感兴趣的可以尝试一下。
这时,可以查看Hive中同步后的project表,如图所示:
hive-select-project-table-increment
根据我们指定的选项(--last-value),可见记录并没有重复,实现了增量同步MySQL表中的数据到Hive表中。
有关Sqoop的更多选项和使用方法,可以参考官方网站的用户手册(User Guide),非常详细。

相关问题

  • 异常org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory project already exists解决方法
  • 在非增量同步模式下,如果第一次我们导入成功,在尝试第二次将数据从MySQL数据库导入到Hive,可能会出现类似如下的错误:

    14/02/26 17:14:34 ERROR security.UserGroupInformation: PriviledgedActionException as:shirdrn cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory project already exists
    14/02/26 17:14:34 ERROR tool.ImportTool: Encountered IOException running import job: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory project already exists
         at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:137)
         at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:973)
         at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:936)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:396)
         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
         at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:936)
         at org.apache.hadoop.mapreduce.Job.submit(Job.java:550)
         at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:580)
         at org.apache.sqoop.mapreduce.ImportJobBase.doSubmitJob(ImportJobBase.java:186)
         at org.apache.sqoop.mapreduce.ImportJobBase.runJob(ImportJobBase.java:159)
         at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:239)
         at org.apache.sqoop.manager.DirectMySQLManager.importTable(DirectMySQLManager.java:92)
         at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:413)
         at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:502)
         at org.apache.sqoop.Sqoop.run(Sqoop.java:145)
         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
         at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:181)
         at org.apache.sqoop.Sqoop.runTool(Sqoop.java:220)
         at org.apache.sqoop.Sqoop.runTool(Sqoop.java:229)
         at org.apache.sqoop.Sqoop.main(Sqoop.java:238)
    

    说明在HDFS上已经存在对应的目录,虽然我们已经在Hive中对表执行了DROP操作:

    DROP TABLE project;
    

    但是,这只是从Hive的MetaStore中删除了元数据,还需要到HDFS上删除,例如:

    bin/hadoop fs -rmr /hive/project
    

    然后再重新运行即可。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>