我是靠谱客的博主 感性板栗,这篇文章主要介绍kettle——好用的ETL工具kettle——好用的ETL工具,现在分享给大家,希望可以做个参考。

kettle——好用的ETL工具

什么是kettle ?

kettle 百度百科:

Kettle是一款国外开源的ETL工具,纯java编写,可以在Windows、Linux、Unix上运行,数据抽取高效稳定。

ETL 百度百科:

ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业的决策提供分析依据, ETL是BI(商业智能)项目重要的一个环节。

商业智能 百度百科:

商业智能(Business Intelligence,简称:BI),又称商业智慧或商务智能,指用现代数据仓库技术、线上分析处理技术、数据挖掘数据展现技术进行数据分析以实现商业价值。

背景介绍之商业智能(Business Intelligence, BI)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cPTsSh1s-1601390379372)(en-resource://database/1047:1)]

  1. DB/Database/数据库——DB 是现有的数据来源,可以为mysql、SQLserver、文件日志等,为数据仓库提供数据来源的一般存在于现有的业务系统之中,用来支持生产的,比如超市的买卖系统。DB保留的是数据信息的最新状态,只有一个状态!比如,每天早上起床洗脸照镜子,看到的就是当时的状态,至于之前的每天的状态,不会出现的你的眼前,这个眼前就是db。

  2. 操作性数据(Operational Data Store) /ODS,作为数据库到数据仓库的一种过渡形式,与数据仓库在物理结构上不同。ODS存储的是当前的数据情况,给使用者提供当前的状态,提供即时性的、操作性的、集成的全体信息的需求。ODS作为数据库到数据仓库的一种过渡形式,能提供高性能的响应时间,ODS设计采用混合设计方式。ODS中的数据是"实时值",而数据仓库的数据却是"历史值",一般ODS中储存的数据不超过一个月,而数据仓库为10年或更多。

特征:

  • 复制代码
    1
    2
    ODS直接存放从业务抽取过来的数据,这些数据从结构和数据上与业务系统保持一致,降低了数据抽取的复杂性。
  • 复制代码
    1
    2
    转移一部分业务系统的细节查询功能,因为ODS存放的数据与业务系统相同,原来有业务系统产生的报表,现在可以从ODS中产生。
  • 复制代码
    1
    2
    完成数据仓库中不能完成的功能,ODS存放的是明细数据,数据仓库DW或数据集市DM都存放的是汇聚数据,ODS提供查询明细的功能。
  • 复制代码
    1
    2
    ODS数据只能增加不能修改,而且数据都是业务系统原样拷贝,所以可能存在数据冲突的可能,解决办法是为每一条数据增加一个时间版本来区分相同的数据。
  1. 数据仓库(Data Warehouse) /DW,顾名思义,数据仓库是一个很大的数据存储集合,出于企业的分析性报告和决策支持目的而创建,对多样的业务数据进行筛选与整合。它为企业提供一定的BI(商业智能)能力,指导业务流程改进、监视时间、成本、质量以及控制。
    数据仓库存储是一个面向主题(移动的用户分析也可做为一个主题)的,反映历史变化数据,用于支撑管理决策。

特征:

  • 效率足够高,要对进入的数据快速处理。
  • 数据质量高,数据仓库是提供很多决策需要的数据支撑,DW的数据应该是唯一的具有权威性的数据,企业的所有系统只能从DW取数据,所以需要定期对DW里面的数据进行质量审,保证DW里边数据的唯一、权威、准确性。
  • 扩展性,企业业务扩展和降低企业建设数据仓库的成本考虑。
  • 面向主题,数据仓库中的数据是按照一定的主题域进行组织的,每一个主题对应一个宏观的分析领域,数据仓库排除对决策无用的数据,提供特定主题的简明视图。
  • 数据仓库主要提供查询服务,并且需要查询能够及时响应。
  • DW的数据也是只允许增加不允许删除和修改,数据仓库主要是提供查询服务,删除和修改在分布式系统。
  1. ETL/Extraction-Transformation-Loading——用于完成DB到DW的数据转存,它将DB中的某一个时间点的状态,“抽取”出来,根据DW的存储模型要求,“转换”一下数据格式,然后再“加载”到DW的一个过程,这里需要强调的是,DB的模型是ER模型,遵从范式化设计原则,而DW的数据模型是雪花型结构或者星型结构,用的是面向主题,面向问题的设计思路,所以DB和DW的模型结构不同,需要进行转换。

  2. 数据集市(Data Mart)/DM,是为了特定的应用目的或应用范围,而从数据仓库中独立出来的一部分数据,也可称为部门数据或主题数据(subjectarea)。在数据仓库的实施过程中往往可以从一个部门的数据集市着手,以后再用几个数据集市组成一个完整的数据仓库。需要注意的就是在实施不同的数据集市时,同一含义的字段定义一定要相容,这样再以后实施数据仓库时才不会造成大麻烦。
    数据集市,以某个业务应用为出发点而建设的局部DW,DW只关心自己需要的数据,不会全盘考虑企业整体的数据架构和应用,每个应用有自己的DM特征:DM结构清洗,针对性强,扩展性好,因为DM仅仅是单对一个领域而建立,容易维护修改DM建设任务繁重,公司有众多业务,每个业务单独建立表。DM的建立更多的消耗存储空间,单独一个DM可能数据量不大,但是企业所有领域都建立DM这个数据量就会增加多倍。

  3. OLAP——在线分析系统,简单说就是报表系统,销售报表,统计报表,等等,这个大家都熟悉,当然,OLAP的统计要更复杂更丰富一些,比如切面,钻取等等。

  4. DM/Data Mining/数据挖掘——这个挖掘,不是简单的统计了,他是根据概率论的或者其他的统计学原理,将DW中的大数据量进行分析,找出我们不能直观发现的规律,比如,如果我们每天早上照相,量身材的时候,还记录下头一天吃的东西,黄瓜,猪腿,烤鸭,以及心情,如果记录上10年,形成了3650天的相貌和饮食心情的数据,我们每个人都记录,有20万人记录了,那么,我们也许通过这些记录,可以分析出,身材相貌和饮食的客观规律;再说一个典型的实例,就是英国的超市,在积累了大量数据之后,对数据分析挖掘之后,得到了一个规律:将小孩的尿布和啤酒放在一起,销量会更好——业务专家在得到该结论之后,仔细分析,知道了原因,因为英国男人喜欢看足球的多,老婆把小孩介绍男人看管,小孩尿尿需要尿布,而男人看足球喜欢喝酒,所以两样商品有密切的关系,放在一起销售会更好!

  5. 决策支持系统(Decision Support Systems)/DSS,是协助进行商业级或组织级决策活动的信息系统。DSSs一般面向中高层面管理,服务于组织机构内部管理、操作和规划级的决策,帮助决策者对快速变化并且很难提前确定的问题进行决策,通常是非结构化(Non-structured)和半结构化(Semi-structured)的决策问题。决策支持系统既可以是完全自动化决策,也可以是完全人工决策,或者两者兼有。

为什么要用kettle ?

ETL工具的好处

  • 当数据来自不同的物理主机,这时候如使用SQL语句去处理的话,就显得比较吃力且开销也更大。
  • 数据来源可以是各种不同的数据库或者文件,这时候需要先把他们整理成统一的格式后才可以进行数据的处理,这一过程用代码实现显然有些麻烦。
  • 在数据库中我们当然可以使用存储过程去处理数据,但是处理海量数据的时候存储过程显然比较吃力,而且会占用较多数据库的资源,这可能会导致数据资源不足,进而影响数据库的性能。

上面所说的问题,我们用ETL工具就可以解决。
它的优点有:

  • 支持多种异构数据源的连接。
  • 图形化的界面操作十分方便。
  • 处理海量数据速度快、流程更清晰等.

常用的ETL工具对比

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VplyyVkj-1601390379379)(en-resource://database/1049:1)]

ETL工具与编程语言对比

使用java导入一个表格文件到数据库 demo

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// MysqlUtil.java package com.example; import java.sql.*; public class MysqlUtil { private Connection conn = null; private Statement stmt = null; // vm_mysql_2 private static final String DB_URL = "jdbc:mysql://192.168.136.129:3306/mydatabase?characterEncoding=utf-8"; private static final String DB_DRIVER = "com.mysql.cj.jdbc.Driver"; private static final String DB_USERNAME = "root"; private static final String DB_PASSWORD = "root"; public Connection getConnection() throws ClassNotFoundException { try{ Class.forName(DB_DRIVER); conn = DriverManager.getConnection(DB_URL,DB_USERNAME,DB_PASSWORD); stmt = conn.createStatement(); } catch(Exception e){ e.printStackTrace(); } finally { return conn; } } public ResultSet executeQuery(String sql){ ResultSet rs = null; if(sql != null && sql.length() != 0){ try{ this.getConnection(); //this.stmt.execute(sql); rs = stmt.executeQuery(sql); return rs; } catch(SQLException e){ e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally{ if(this.stmt != null){ //this.stmt.close(); } } return rs; }else{ return rs; } } public boolean executeUpdate(String sql) throws SQLException { if(sql != null && sql.length() != 0){ try{ this.getConnection(); this.stmt.executeUpdate(sql); return true; } catch(SQLException e){ e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally{ if(this.stmt != null){ this.stmt.close(); } } return false; }else{ return false; } } public static void main(String[] args) throws SQLException { MysqlUtil mysqlUtil = new MysqlUtil(); String sql = "select * from testTable"; ResultSet rs = mysqlUtil.executeQuery(sql); while(rs.next()){ System.out.println(rs.getString("sname")+ rs.getString("sno")+ rs.getString("classno")); } rs.close(); mysqlUtil.stmt.close(); mysqlUtil.conn.close(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//ExcelLoad.java package com.example; import jxl.Cell; import jxl.Sheet; import jxl.Workbook; import jxl.read.biff.BiffException; import java.io.File; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; public class ExcelLoad { public static final String tableName = "testTable"; static final int sheetNum = 0; static final int startColumn = 1; static final int endColumn = 3; static final int startRow = 2; private int endRow ; public void HandleSingleFile(File excelFile){ System.out.println("正在处理文件:" + excelFile.getName()); Workbook wb = null; try{ wb = Workbook.getWorkbook(excelFile); } catch (IOException e) { e.printStackTrace(); } catch (BiffException e) { e.printStackTrace(); } //读入页 Sheet sheet = wb.getSheets()[sheetNum]; this.endRow = sheet.getRows(); for(int i = startRow - 1; i < this.endRow; i++){ List<String> rowData = new ArrayList<String>(); for(int j = startColumn -1; j < endColumn; j++){ Cell cell = sheet.getCell(j,i); String cellContent = cell.getContents(); if(cellContent == ""){ cellContent = null; } rowData.add(cellContent); } System.out.println("正在处理第"+(i+1)+"行"+rowData.get(0)); try{ insertRow(rowData, tableName); } catch (SQLException throwables) { throwables.printStackTrace(); } } } public void insertRow(List<String> rowData, String dataTable) throws SQLException { String sql = "INSERT INTO testTable (sname,sno,classno) VALUES ("; sql += "'"+rowData.get(0).toString()+"'"; sql += ","; sql += "'" + rowData.get(1).toString() + "'"; sql += ","; sql += "'" + rowData.get(2).toString() + "'"; sql += ") ON DUPLICATE KEY UPDATE sname=values(sname),classno=values(classno)"; MysqlUtil mysqlUtil = new MysqlUtil(); mysqlUtil.executeUpdate(sql); } public static void main(String[] args){ String path = "D:\document\blog\kettle\introduce\student.xls"; File file = new File(path); ExcelLoad excelLoad = new ExcelLoad(); excelLoad.HandleSingleFile(file); } }

使用python导入一个表格文件到数据库 demo

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pandas as pd import pymysql as pm import numpy as np path = 'D:/document/blog/kettle/introduce/student.xls' # vm_mysql_3 dbhost = '192.168.136.133' dbport = 3306 dbname = 'mydatabase' username = 'root' pwd = 'root' sql = '''INSERT INTO testTable (sname,sno,classno) VALUES (%s,%s,%s) ON DUPLICATE KEY UPDATE sname=values(sname),classno=values(classno) ''' def loadExcel(): global path excelFile = pd.read_excel(path) data_list = np.array(excelFile) data_target_list=[] for row_src in data_list: row_target = (row_src[0],row_src[1],row_src[2]) data_target_list.append(row_target) # print(data_list) print(data_target_list) global sql sqlExcute(sql,data_target_list) def sqlExcute(sqlStr,datatuple): global dbhost,dbport,dbname,username,password conn = pm.connect(host=dbhost,user=username,password=pwd,db=dbname,port=dbport) cur = conn.cursor() try: cur.executemany(sqlStr,datatuple) conn.commit() except Exception as e: conn.rollback() finally: conn.close() if __name__ == '__main__': loadExcel()

使用kettle导入一个表格文件到数据库 demo

整体流程:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LQX6YC4p-1601390379381)(en-resource://database/1051:1)]
设置导入参数:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZYKNPKml-1601390379383)(en-resource://database/1053:1)]
设置输出数据库表参数:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b8glskut-1601390379384)(en-resource://database/1055:1)]
设置出错处理:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iiuSKLbL-1601390379386)(en-resource://database/1057:1)]
设置运行日志:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6BLokb0I-1601390379386)(en-resource://database/1059:1)]

怎么用kettle ?

转换

导入文本demo

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p8v5CM5A-1601390379387)(en-resource://database/1061:1)]

导入表格demo

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mF3eJ5eM-1601390379388)(en-resource://database/1063:1)]

导入数据库(mysql)demo

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QmJz4kPb-1601390379389)(en-resource://database/1065:1)]

多个数据源处理 demo

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RHJjPJ5Q-1601390379390)(en-resource://database/1069:1)]

作业

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WXh3JLBu-1601390379391)(en-resource://database/1071:1)]

在程序中调用

在bat文件中调用 demo

复制代码
1
2
3
4
time /t D:pSoftkettledata-integrationpan.bat /file D:documentblogkettleintroduceexcelLoad.ktr /level Basic /logfile D:documentblogkettleintroducetest1.log time /t

在sh文件中调用 demo

复制代码
1
2
/usr/local/kettle/pan.sh -file /tmp/kettleFile/mysql_2to1_linux.ktr /level Basic /logfile /tmp/kettleFile/test1.log

在java程序中调用 demo

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package demo; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; public class Demo { public static void main(String[] args) throws KettleException { //初始化ketlle KettleEnvironment.init(); //创建转换元数据对象 TransMeta meta = new TransMeta("D:\document\blog\kettle\introduce\mysql_2to1.ktr"); Trans trans = new Trans(meta); trans.prepareExecution(null); trans.startThreads(); trans.waitUntilFinished(); if(trans.getErrors()==0){ System.out.println("执行成功!"); } } }

最后

以上就是感性板栗最近收集整理的关于kettle——好用的ETL工具kettle——好用的ETL工具的全部内容,更多相关kettle——好用内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(75)

评论列表共有 0 条评论

立即
投稿
返回
顶部