简介

数据集成模块是在各个存储单元之间执行数据交换的通道,具备分布式底层架构,稳定高效、弹性伸缩的特点,致力于提供复杂网络环境下、丰富的异构数据源之间数据高速稳定的数据移动及同步能力。为了在DTinsightBatch进行大规模数据集的挖掘与计算,通常的做法是在任务执行前将数据传输至DTinsightBatch,并在任务执行结束后将计算结果传输至外部存储单元(例如MySQL等应用数据库)。数据集成的作用如下图所示:
数栈输入与输出图

数据源类型

数据集成提供丰富的数据源支持,如下所示:

  1. 关系型数据库(MySQL / Oracle / SQLServer / PostgreSQL等)
  2. NoSQL(Redis / MongoDB / HBase / ElasticSearch等)
  3. 大数据存储(MaxCompute / HDFS / Hive等)
  4. 文本存储(FTP)
    注1:Redis不支持抽取
    注2:Redis、MongoDB、ElasticSearch不支持向导

数据源管理

数据源管理是对外部存储单元访问参数的管理,数据集成模块需要与数据开发配合起来才能发挥作用,实际是由定时任务来执行数据传输的。

在项目上方的数据集成菜单,进入数据数据源管理页面,可看到目前已经集成的数据源列表,包括数据源名称、类型、连接信息、描述、最近修改人、最近修改时间、状态等信息,同时可执行编辑、删除等操作。

在数据源列表右上角的新增数据源,选择不同的数据源类型,需要填写不同的配置信息。
具体配置信息:https://insight.dtstack.com/public/helpSite/dtinsight-batch/v3.0/DataIntegration/DataSourceManage.html#Hive

同步任务配置

数据同步任务的配置共分为5个步骤:

  1. 选择数据来源:选择已配置的数据源,系统会读取其中的数据;
  2. 选择数据目标:选择已配置的数据源,系统会向其写入数据;
  3. 字段映射:配置数据来源与数据目标之间的字段映射关系,不同的数据类型在这步有不同的配置方法;
  4. 通道控制:控制数据同步的执行速度、错误数据的处理方式等;
  5. 预览保存:再次确认已配置的规则并保存;

同步任务的创建

创建同步任务时,在数据开发页面点击新建任务,并选择数据同步的任务类型,并进行向导/脚本模式的选择,模式一旦选择之后即不可修改。
如果选择了脚本模式,可以在编辑区左上角点击导入模板,并选择数据源类型、数据库等信息,确定后即可导入模板,只需编辑其中一部分信息。
注:导入模板后,之前填写的信息会被清空并覆盖

脚本模式的介绍

数据同步任务包括一个job元素,而这个元素包括setting和content两部分。

1
2
3
4
5
6
{
"job": {
"setting": {...},
"content": [...]
}
}

content

content:用于配置具体任务信息,包括从哪里来(Reader插件信息),到哪里去(Writer插件信息)
reader: 用于读取数据的插件的信息
writer: 用于写入数据的插件的信息
reader和writer包括name和parameter,分别表示插件名称和插件参数
插件配置样例:https://insight.dtstack.com/public/helpSite/dtinsight-batch/v3.0/DataIntegration/JobConfig.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
"content": [
{
"reader": {
"name": "...",
"parameter": {
...
}
},
"writer": {
"name": "...",
"parameter": {
...
}
}
}
]

setting

setting包括speed、errorLimit和dirty三部分,分别描述限速、错误控制和脏数据管理的配置信息

1
2
3
4
5
"setting": {
"speed": {...},
"errorLimit": {...},
"dirty": {...}
}

speed

channel: 任务并发数
bytes: 每秒字节数,默认为0(不限速)

1
2
3
4
"speed": {
"channel": 3,
"bytes": 0
}

errorLimit

record: 出错记录数超过record设置的条数时,任务标记为失败
percentage: 当出错记录数超过percentage百分数时,任务标记为失败

1
2
3
4
"errorLimit": {
"record": 10000,
"percentage": 100
}

dirty

path: 脏数据存放路径
hadoopConfig: 脏数据存放路径对应hdfs的配置信息(hdfs高可用配置)

1
2
3
4
5
6
7
8
9
10
11
12
13
"dirty": {
"path": "/tmp",
"hadoopConfig": {
"fs.default.name": "hdfs://ns1",
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
"dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
"dfs.ha.automatic-failover.enabled": "true",
"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"fs.hdfs.impl.disable.cache": "true"
}
}

整库迁移

整库迁移是为了提升用户效率、降低用户使用成本的一种快捷工具,它可以快速完成把MySQL数据库内所有表一并上传到DTinsightBatch(Hive)的工作。
操作步骤:

  1. 登录到DTinsightBatch>数据集成,进入数据源管理页面。
  2. 单击右上角的新增数据源,添加一个面向整库迁移的MySQL数据源(这里假设为MySQL_Migrate),单击测试连通性验证数据源访问正确无误后,确认并保存此数据源。
  3. 新增数据源成功后,即可在数据源列表中看到新增的MySQL数据源MySQL_Migrate。单击整库迁移,即可进入对应数据源的整库迁移功能界面,整库迁移界面主要分为3块功能区域:
    a.待迁移表筛选区,此处将MySQL数据源MySQLMigrate下的所有数据库表以表格的形式展现出来,您可以根据实际需要批量选择待迁移的数据库表。
    b.高级设置,此处提供了MySQL数据表和DTinsightBatch数据表的表名称、列名称、列类型的映射转换规则。
    c.迁移模式、并发控制区,此处可以控制整库迁移的模式(全量、增量)、并发度配置(分批上次、整批上传)、提交迁移任务进度状态信息等。
    d.单击高级设置按钮,您可以根据您具体需求选择转换规则。比如DTinsightBatch端建表时统一增加了ods
    这一前缀。
    e.在迁移模式、并发控制区中,选择同步方式为每日增量,并配置增量字段为gmt_modified,数据集成默认会根据您选择的增量字段生成具体每个任务的增量抽取where条件,并配合DTinsightBatch调度参数(例如:${bdp.system.bizdate})形成针对每天的数据抽取条件。
    f.数据集成抽取MySQL库表的数据是通过JDBC连接远程MySQL数据库,并执行相应的SQL语句,将数据从MySQL库中select出来。由于是标准的SQL抽取语句,可以配置where子句控制数据范围。为了对源头MySQL数据源进行保护,避免同一时间点启动大量数据同步作业带来数据库压力过大,可选择分批上传模式,并配置从每日0点开始,每1小时启动3个数据库表同步。
  4. 最后,单击提交任务按钮,这里可以看到迁移进度信息,以及每一个表的迁移任务状态。
  5. 单击对应的迁移任务,会跳转到数据集成的任务开发界面,在左侧目录树clonedatabase目录下,会有对应的所有整库迁移任务,任务命名规则是:mysql2hive源表名。

注:这些任务会根据配置的调度周期(默认天调度)被调度执行,也可以使用DTinsight调度补数据功能完成历史数据的传输。通过整库迁移功能可以极大减少您初始化的配置、迁移成本。

版本支持

DTinsightBatch数据集成模块对不同数据库的版本支持情况:
MySQL 5.0及以上
Oracle 11g、12c
SQL Server 2008及以上
Hive
HBase
HDFS
ElasticSearch

最后更新: 2018年12月19日 10:12

原始链接: https://www.lousenjay.top/2018/12/18/数栈-数据集成学习笔记/