现在网上java 调用kettle api操作的很少(本人环境MYSQL kettle5.2),我来做下好人。
创建表语法:
CREATE TABLE `t_lzfx_base_syonline` ( `ID` varchar(36) NOT NULL, `CREATEDATETIME` datetime DEFAULT NULL, `IP` varchar(100) DEFAULT NULL, `LOGINNAME` varchar(100) DEFAULT NULL, `TYPE` varchar(1) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `syonline` ( `ID` varchar(36) NOT NULL, `CREATEDATETIME` datetime DEFAULT NULL, `IP` varchar(100) DEFAULT NULL, `LOGINNAME` varchar(100) DEFAULT NULL, `TYPE` varchar(1) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
JAVA代码:
/** * 两个库中的表名 */ public static String bjdt_tablename = "t_lzfx_base_syonline"; public static String kettle_tablename = "syonline"; public static String kettle_log = "t_lzfx_data_log"; /** * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml) */ public static final String[] databasesXML = { "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>bjdt</name>" + "<server>127.0.0.1</server>" + "<type>MYSQL</type>" + "<access>Native</access>" + "<database>zjdata</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>kettle</name>" + "<server>127.0.0.1</server>" + "<type>MYSQL</type>" + "<access>Native</access>" + "<database>kettledb</database>" + "<port>3306</port>" + "<username>root</username>" + "<password>root</password>" + "</connection>" }; /** * @param args */ public static void main(String[] args) { try { KettleEnvironment.init(); TransMeta transMeta = new TransMeta(); //设置转化的名称 transMeta.setName("转换名称"); //添加转换的数据库连接 for (int i=0;i<databasesXML.length;i++){ DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]); transMeta.addDatabase(databaseMeta); } VariableSpace space = new Variables(); //将step日志数据库配置名加入到变量集中 space.setVariable("kettle_log","bjdt"); space.initializeVariablesFrom(null); StepLogTable stepLogTable = StepLogTable.getDefault(space,transMeta); //StepLogTable使用的数据库连接名(上面配置的变量名)。 stepLogTable.setConnectionName("bjdt"); //设置Step日志的表名 stepLogTable.setTableName(kettle_log); //设置TransMeta的StepLogTable transMeta.setStepLogTable(stepLogTable); //registry是给每个步骤生成一个标识Id用 PluginRegistry registry = PluginRegistry.getInstance(); //****************************************************************** //第一个表输入步骤(TableInputMeta) TableInputMeta tableInput = new TableInputMeta(); String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput); //给表输入添加一个DatabaseMeta连接数据库 DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt"); tableInput.setDatabaseMeta(database_bjdt); String select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+bjdt_tablename; tableInput.setSQL(select_sql); //添加TableInputMeta到转换中 StepMeta tableInputMetaStep = new StepMeta("INPUTTABLE_"+bjdt_tablename,tableInput); //给步骤添加在spoon工具中的显示位置 tableInputMetaStep.setDraw(true); tableInputMetaStep.setLocation(100, 100); transMeta.addStep(tableInputMetaStep); //****************************************************************** //****************************************************************** //第二个步骤插入与更新 InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); insertUpdateMeta.setCommitSize(10000); //性能设置 String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class,insertUpdateMeta); //添加数据库连接 DatabaseMeta database_kettle = transMeta.findDatabase("kettle"); insertUpdateMeta.setDatabaseMeta(database_kettle); //设置操作的表 insertUpdateMeta.setTableName(kettle_tablename); //设置用来查询的关键字 insertUpdateMeta.setKeyLookup(new String[]{"ID"}); insertUpdateMeta.setKeyStream(new String[]{"ID"}); insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上 insertUpdateMeta.setKeyCondition(new String[]{"="}); //String select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+bjdt_tablename; //设置要更新的字段 String[] updatelookup = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"} ; String [] updateStream = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"}; Boolean[] updateOrNot = {false,true,true,true,true}; insertUpdateMeta.setUpdateLookup(updatelookup); insertUpdateMeta.setUpdateStream(updateStream); insertUpdateMeta.setUpdate(updateOrNot); String[] lookup = insertUpdateMeta.getUpdateLookup(); //System.out.println("******:"+lookup[1]); //System.out.println("insertUpdateMetaXMl:"+insertUpdateMeta.getXML()); //添加步骤到转换中 StepMeta insertUpdateStep = new StepMeta("INSERTUPDATE_"+kettle_tablename,insertUpdateMeta); insertUpdateStep.setDraw(true); insertUpdateStep.setLocation(250,100); transMeta.addStep(insertUpdateStep); //****************************************************************** //****************************************************************** //添加hop把两个步骤关联起来 transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep)); Trans trans = new Trans(transMeta); trans.execute(null); // You can pass arguments instead of null. trans.waitUntilFinished(); if ( trans.getErrors() > 0 ) { throw new RuntimeException( "There were errors during transformation execution." ); } System.out.println("***********the end************"); } catch (Exception e) { e.printStackTrace(); return; } }
运行结果:
2015/04/18 16:36:51 - 转换名称 - 为了转换解除补丁开始 [转换名称] 2015/04/18 16:37:00 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 50000 2015/04/18 16:37:02 - INSERTUPDATE_syonline.0 - linenr 50000 2015/04/18 16:37:08 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 100000 2015/04/18 16:37:09 - INSERTUPDATE_syonline.0 - linenr 100000 2015/04/18 16:37:15 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 150000 2015/04/18 16:37:16 - INSERTUPDATE_syonline.0 - linenr 150000 2015/04/18 16:37:22 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 200000 2015/04/18 16:37:23 - INSERTUPDATE_syonline.0 - linenr 200000 2015/04/18 16:37:29 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 250000 2015/04/18 16:37:31 - INSERTUPDATE_syonline.0 - linenr 250000 2015/04/18 16:37:36 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 300000 2015/04/18 16:37:38 - INSERTUPDATE_syonline.0 - linenr 300000 2015/04/18 16:37:43 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 350000 2015/04/18 16:37:45 - INSERTUPDATE_syonline.0 - linenr 350000 2015/04/18 16:37:51 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 400000 2015/04/18 16:37:52 - INSERTUPDATE_syonline.0 - linenr 400000 2015/04/18 16:37:58 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 450000 2015/04/18 16:37:59 - INSERTUPDATE_syonline.0 - linenr 450000 2015/04/18 16:38:05 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 500000 2015/04/18 16:38:05 - INPUTTABLE_t_lzfx_base_syonline.0 - Finished reading query, closing connection. 2015/04/18 16:38:05 - INPUTTABLE_t_lzfx_base_syonline.0 - 完成处理 (I=500184, O=0, R=0, W=500184, U=0, E=0 2015/04/18 16:38:06 - INSERTUPDATE_syonline.0 - linenr 500000 2015/04/18 16:38:06 - INSERTUPDATE_syonline.0 - 完成处理 (I=500184, O=0, R=500184, W=500184, U=0, E=0
相关推荐
通过调用kettle的API接口,实现将一个库的数据转移到另一个数据库中。附件中同时提供了抽取需要的jar包
java 调用 kettle api 所需jar包依赖。
java调用kettle的依赖包。调试代码的过程中一个个找出来的,分享给大家 kettle java 依赖包
对向前兼容性的推荐 :如果想要动态地创造Transformation (例如:从元数据),使用XML文件方法(KTR)而不是使用API。 XML文件兼容Kettle所有版本,同样对job有效的。 下面的例子进行以下操作: 1创建Transformation ...
java中调用kettle中的job与转换源码,其中kettle用的是5.2.0.0的版本。已经测试过可以调用访问,并且可以传入参数调用。有需要的可直接拿去使用。
简单的Java调用kettle转换
java调用kettle使用示例,包含数据库到数据库,csv到数据库及数据库到excel数据同步调用代码。
java调用kettle中的Job或是Trans要用到的jar包
使用java调用kettle的jar包,实现同步任务的调度,内含简单的demo
用java调用kettle设计并包含主要源代码
java调用kettle时需要的jar包合计,其中包括了可能会用到的Oracle和MySQL的驱动文件
Kettle java API Kettle java API
本资源主要内容包括(资源有保证,都是干货,一看就会): ...2.JAVA传参调用KETTLE远程服务执行任务实现过程及注意事项说明。 3.附件包括:配套说明文档、代码包、KETLLE任务测试文件及数据库脚本。
kettle是一个纯java编写的etl工具,同时提供了相关api供java进行调用。速度是很快的10000笔/S速度。本资料提供的是java调用kettle需要的jar包
java调用Kettle引用jar包
用于java调用kettlektr文件的jar,在kettle官网下载图形化界面里面也包含
java调用Kettle引用jar包.zip
Java调用Kettle代码 ,用Java代码调用KTR文件,和JOb文件
java远程调用kettle说明与代码.pdf
综合网上的方案 把转换和作业合成一个方法,并包含完整的测试方法 jar包