我们使用的是Sqoop-1.4.4,在进行关系型数据库与Hadoop/Hive数据同步的时候,如果使用--incremental
选项,如使用append模式,我们需要记录一个--last-value
的值,如果每次执行同步脚本的时候,都需要从日志中解析出来这个--last-value
的值,然后重新设置脚本参数,才能正确同步,保证从关系型数据库同步到Hadoop/Hive的数据不发生重复的问题。
而且,我们我们需要管理我们使用的这些脚本,每次执行之前可能要获取指定参数值,或者修改参数。Sqoop也提供了一种比较方面的方式,那就是直接创建一个Sqoop job,通过job来管理特定的同步任务。就像我们前面提到的增量同步问题,通过创建sqoop job可以保存上一次同步时记录的--last-value
的值,也就不用再费劲去解析获取了,每次想要同步,这个job会自动从job保存的数据中获取到。
sqoop job命令使用
Sqoop job相关的命令有两个:
- bin/sqoop job
- bin/sqoop-job
使用这两个都可以。我们先看看sqoop job命令的基本用法:
- 创建job:
--create
- 删除job:
--delete
- 执行job:
--exec
- 显示job:
--show
- 列出job:
--list
下面,我们基于增量同步数据这个应用场景,创建一个sqoop job,命令如下所示:
bin/sqoop job --create your-sync-job -- import --connect jdbc:mysql://10.95.3.49:3306/workflow --table project --username shirdrn -P --hive-import --incremental append --check-column id --last-value 1 -- --default-character-set=utf-8
创建了job,id为“your-sync-job”,它是将MySQL数据库workflow中的project表同步到Hive表中,而且--incremental append
选项使用append模式,--last-value
为1,从MySQL表中自增主键id=1开始同步。然后我们根据这个job的id去查询job详细配置情况:
bin/sqoop job --show your-sync-job
结果示例,如下所示:
Job: your-sync-job Tool: import Options: ---------------------------- verbose = false incremental.last.value = 1 db.connect.string = jdbc:mysql://10.95.3.49:3306/workflow codegen.output.delimiters.escape = 0 codegen.output.delimiters.enclose.required = false codegen.input.delimiters.field = 0 hbase.create.table = false db.require.password = true hdfs.append.dir = true db.table = project import.fetch.size = null codegen.input.delimiters.escape = 0 codegen.input.delimiters.enclose.required = false db.username = shirdrn codegen.output.delimiters.record = 10 import.max.inline.lob.size = 16777216 hcatalog.create.table = false db.clear.staging.table = false incremental.col = id codegen.input.delimiters.record = 0 enable.compression = false hive.overwrite.table = false hive.import = true codegen.input.delimiters.enclose = 0 hive.drop.delims = false codegen.output.delimiters.enclose = 0 hdfs.delete-target.dir = false codegen.output.dir = . codegen.auto.compile.dir = true mapreduce.num.mappers = 4 import.direct.split.size = 0 export.new.update = UpdateOnly codegen.output.delimiters.field = 1 incremental.mode = AppendRows hdfs.file.format = TextFile codegen.compile.dir = /tmp/sqoop-shirdrn/compile/a1ed2c6097c4534d20f2ea981662556e direct.import = false hive.fail.table.exists = false tool.arguments.0 = --default-character-set=utf-8 db.batch = false
通过incremental.last.value = 1可以看到,通过该选项来控制增量同步开始记录。
接着,可以使用创建的这个job id来运行它,执行如下命令:
bin/sqoop job --exec your-sync-job
可以查询,MySQL数据库workflow中的project表中的数据被同步到Hive表中。
这时,可以通过bin/sqoop job --show your-sync-job
命令,查看当前的sqoop job配置情况,可以看到如下变化:
incremental.last.value = 7
从MySQL表中增量同步的起始id变为7,下次同步就会把id大于7的记录同步到Hive表中。可以在MySQL表中再INSERT一条记录,再次执行your-sync-job,能够正确地进行增量同步。
Sqoop job安全配置
默认情况下,创建的每个job在运行的时候都不会进行安全的认证。如果我们希望限制指定的sqoop job的执行,只有经过认证以后才能执行,这时候可以使用sqoop job的安全选项。Sqoop安装目录下,通过修改配置文件conf/sqoop-site.xml可以对job进行更高级的配置。实际上,我们使用了Sqoop的metastore工具,它能够对Sqoop进行细粒度的配置。
我们要将MySQL数据库中的数据同步到Hive表,每次执行sqoop job都需要输入访问MySQL数据库的连接账号信息,可以设置sqoop.metastore.client.record.password的值为true。如果在conf/sqoop-site.xml中增加如下配置,会将连接账号信息存储到Sqoop的metastore中:
<property> <name>sqoop.metastore.client.record.password</name> <value>true</value> <description>If true, allow saved passwords in the metastore. </description> </property>
如果想要限制从外部调用执行Sqoop job,如将Sqoop job提交给Oozie调度程序,也会通过上面Sqoop的metastore配置的内容来进行验证。
另外,Sqoop的metastore工具,可以允许我们指定为外部,例如使用外部主机上的MySQL数据库来存储元数据,可以在conf/sqoop-site.xml配置如下:
<property> <name>sqoop.metastore.client.autoconnect.url</name> <value>jdbc:mysql://10.95.3.49:3306/sqoop_metastore</value> <description>The connect string to use when connecting to a job-management metastore. If unspecified, uses ~/.sqoop/. You can specify a different path here. </description> </property> <property> <name>sqoop.metastore.client.autoconnect.username</name> <value>shirdrn</value> <description>The username to bind to the metastore. </description> </property> <property> <name>sqoop.metastore.client.autoconnect.password</name> <value>108loIOL</value> <description>The password to bind to the metastore. </description> </property>
还有一个可与选择的配置项是,可以设置是否自动连接到外部metastore数据库,通过如下配置指定:
<property> <name>sqoop.metastore.client.enable.autoconnect</name> <value>false</value> <description>If true, Sqoop will connect to a local metastore for job management when no other metastore arguments are provided. </description> </property>
这样,你可以通过MySQL的授权机制,来限制指定的用户和主机(或IP地址)访问Sqoop的metadata,也能起到一定的安全访问限制。
参考链接
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
你好,请问你的Sqoop的metastore使用mysql没有其他额外配置吗,从sqoop的文档上看似乎只支持HSQL Database呀。
不好意思,这里我忘记更新了,目前Sqoop metastore是不支持MySQL的。
您好,请问有试过使用Oozie来定时增量从MySQL导入到Hbase吗?使用Oozie来调度Sqoop job的时候metastore具体要如何设置呢?我一直都没有配置成功,请赐教。
这个仔细参考Sqoop官方文档就可以配置成功,如果报错了,你可以把错误信息贴出来看看。
使用sqoop export 从hdfs往mysql中导入数据,数据有丢失,
要确定HDFS中数据各个字段与MySQL表一致,如果出现不一致的情况,如HDFS文件中字段长度过长,插入MySQL时可能会自动跳过该异常记录。
如果mysql时钟比hadoop时钟落后,那从mysql import到hive的定时sqoop job,是不是会一直取不到数据?
如果你设置不合理,可能会取不到数据,或者漏掉数据,一般你可以在一个合理的时间差之后运行sqoop job,保证不漏掉数据。如果实时性要求很高,就不能靠这个了。
大神,我看了好多网页都感觉在说sqoop的数据迁移功能,这个Sqoop支持事务同步不呢,比如oracle到hdoop,在t1时刻从oracle同步了一条数据到hadoop,在t2时刻oracle对这条数据进行了更改,这个sqoop能吧这个update同步到hadoop不。
食shiqu