`
zhangzhen881024
  • 浏览: 21100 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

java 调用kettle api实现数据同步

 
阅读更多

数据库:   

kettle 日志表

CREATE TABLE `t_lzfx_data_log` (
  `ID` bigint(20) NOT NULL AUTO_INCREMENT,
  `ID_BATCH` int(11) DEFAULT '0',
  `CHANNEL_ID` varchar(255) DEFAULT NULL,
  `TRANSNAME` varchar(255) DEFAULT NULL,
  `STEPNAME` varchar(200) DEFAULT NULL,
  `STEP_COPY` int(11) DEFAULT NULL,
  `LINES_READ` int(11) DEFAULT NULL,
  `LINES_WRITTEN` int(11) DEFAULT NULL,
  `LINES_UPDATED` int(11) DEFAULT NULL,
  `LINES_INPUT` int(11) DEFAULT NULL,
  `LINES_OUTPUT` int(11) DEFAULT NULL,
  `LINES_REJECTED` int(11) DEFAULT NULL,
  `ERRORS` int(11) DEFAULT NULL,
  `LOG_FIELD` blob,
  `LOG_DATE` datetime DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 数据表:

CREATE TABLE `syonline` (
  `ID` varchar(36) NOT NULL,
  `CREATEDATETIME` datetime DEFAULT NULL,
  `IP` varchar(100) DEFAULT NULL,
  `LOGINNAME` varchar(100) DEFAULT NULL,
  `TYPE` varchar(1) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


CREATE TABLE `t_lzfx_base_syonline` (
  `ID` varchar(36) NOT NULL,
  `CREATEDATETIME` datetime DEFAULT NULL,
  `IP` varchar(100) DEFAULT NULL,
  `LOGINNAME` varchar(100) DEFAULT NULL,
  `TYPE` varchar(1) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

JAVA 代码

 /**
	  * 两个库中的表名
	  */
	 public static String bjdt_tablename = "t_lzfx_base_syonline";
	 public static String kettle_tablename = "syonline";
	 public static String kettle_log = "t_lzfx_data_log";
	 
	/**
	 * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
	 */
	 public static final String[] databasesXML = {
	        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
	          "<connection>" +
	            "<name>bjdt</name>" +
	            "<server>127.0.0.1</server>" +
	            "<type>MYSQL</type>" +
	            "<access>Native</access>" + 
	            "<database>zjdata</database>" +
	            "<port>3306</port>" +
	            "<username>root</username>" +
	            "<password>root</password>" +
	          "</connection>",
	          "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
	          "<connection>" +
	            "<name>kettle</name>" +
	            "<server>127.0.0.1</server>" +
	            "<type>MYSQL</type>" +
	            "<access>Native</access>" + 
	            "<database>kettledb</database>" +
	            "<port>3306</port>" +
	            "<username>root</username>" +
	            "<password>root</password>" +
	          "</connection>"
	    };	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			KettleEnvironment.init();
			transDemo = new KettleDeleteTest();
			System.out.println("************start to generate my own transformation***********");
			
			TransMeta transMeta = new TransMeta();
			//设置转化的名称 
			transMeta.setName("转换名称");
			
			//添加转换的数据库连接
	        for (int i=0;i<databasesXML.length;i++){
	            DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
	            transMeta.addDatabase(databaseMeta);
	        }
	        VariableSpace space = new Variables();  
	        //将step日志数据库配置名加入到变量集中  
	        space.setVariable("kettle_log","bjdt");
	        space.initializeVariablesFrom(null);  
	        StepLogTable stepLogTable = StepLogTable.getDefault(space,transMeta);
	        //StepLogTable使用的数据库连接名(上面配置的变量名)。  
	        stepLogTable.setConnectionName("bjdt");
	        //设置Step日志的表名  
	        stepLogTable.setTableName(kettle_log); 
	        //设置TransMeta的StepLogTable  
	        transMeta.setStepLogTable(stepLogTable); 
			
			//******************************************************************
			//第一个表输入步骤(原表数据输入)
			TableInputMeta oldTableInput = new TableInputMeta();
			DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");
			oldTableInput.setDatabaseMeta(database_bjdt);
			String old_select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+bjdt_tablename;
			oldTableInput.setSQL(old_select_sql);
			
			//添加TableInputMeta到转换中
			StepMeta oldTableInputMetaStep = new StepMeta("INPUTTABLE_"+bjdt_tablename,oldTableInput);
			//给步骤添加在spoon工具中的显示位置
			transMeta.addStep(oldTableInputMetaStep);
			//*****************************************************************
			//第二个表输入步骤(原表数据输入)
			TableInputMeta newTableInput = new TableInputMeta();
			//给表输入添加一个DatabaseMeta连接数据库
			DatabaseMeta database_kettle = transMeta.findDatabase("kettle");
			newTableInput.setDatabaseMeta(database_kettle);
			String new_select_sql = "SELECT ID,IP,CREATEDATETIME,LOGINNAME,TYPE FROM "+kettle_tablename;
			newTableInput.setSQL(new_select_sql);
			
			//添加TableInputMeta到转换中
			StepMeta newTableInputMetaStep = new StepMeta("INPUTTABLE_"+kettle_tablename,newTableInput);
			//给步骤添加在spoon工具中的显示位置
			transMeta.addStep(newTableInputMetaStep);
			//******************************************************************
			
			//******************************************************************
			//第三个步骤合并
			MergeRowsMeta mergeRowsMeta = new MergeRowsMeta();
///设置合并步骤的新旧数据源
			StepIOMetaInterface stepIOMeta = mergeRowsMeta.getStepIOMeta();
			stepIOMeta.getInfoStreams().get(0).setStepMeta(newTableInputMetaStep);
			stepIOMeta.getInfoStreams().get(1).setStepMeta(oldTableInputMetaStep);
			mergeRowsMeta.setFlagField("bz"); //设置标志字段
			mergeRowsMeta.setKeyFields(new String[]{"ID"});
			mergeRowsMeta.setValueFields(new String[]{"IP","CREATEDATETIME","LOGINNAME","TYPE"});
			StepMeta mergeStepMeta = new StepMeta("合并记录", mergeRowsMeta);
			transMeta.addStep(mergeStepMeta);
			//******************************************************************
			
			//******************************************************************
			//添加HOP把两个输入和合并的步骤关联
			transMeta.addTransHop(new TransHopMeta(oldTableInputMetaStep, mergeStepMeta));
			transMeta.addTransHop(new TransHopMeta(newTableInputMetaStep, mergeStepMeta));
			//******************************************************************
			
			//******************************************************************
			//第四个步骤同步数据
			SynchronizeAfterMergeMeta synchronizeAfterMergeMeta = new SynchronizeAfterMergeMeta();
			synchronizeAfterMergeMeta.setCommitSize(10000); //设置事务提交数量
			synchronizeAfterMergeMeta.setDatabaseMeta(database_kettle); //目标数据源
			synchronizeAfterMergeMeta.setSchemaName("");//数据表schema
			synchronizeAfterMergeMeta.setTableName(kettle_tablename); //数据表名称
			synchronizeAfterMergeMeta.setUseBatchUpdate(true); //设置批量更新
			//设置用来查询的关键字
			synchronizeAfterMergeMeta.setKeyLookup(new String[]{"ID"}); //设置用来查询的关键字
			synchronizeAfterMergeMeta.setKeyStream(new String[]{"ID"}); //设置流输入的字段
			synchronizeAfterMergeMeta.setKeyStream2(new String[]{""});//一定要加上
			synchronizeAfterMergeMeta.setKeyCondition(new String[]{"="}); //设置操作符
			//设置要更新的字段
			String[] updatelookup = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"} ;
	 		String [] updateStream = {"ID","IP","CREATEDATETIME","LOGINNAME","TYPE"};
	 		Boolean[] updateOrNot = {false,true,true,true,true};
	 		synchronizeAfterMergeMeta.setUpdateLookup(updatelookup);
	 		synchronizeAfterMergeMeta.setUpdateStream(updateStream);
	 		synchronizeAfterMergeMeta.setUpdate(updateOrNot);
	 		
	 		//设置高级属性(操作)
	 		synchronizeAfterMergeMeta.setOperationOrderField("bz"); //设置操作标志字段名
	 		synchronizeAfterMergeMeta.setOrderInsert("new");
	 		synchronizeAfterMergeMeta.setOrderUpdate("changed");
	 		synchronizeAfterMergeMeta.setOrderDelete("deleted");
	 		StepMeta synStepMeta = new StepMeta("数据同步", synchronizeAfterMergeMeta);
	 		transMeta.addStep(synStepMeta);
			//******************************************************************
			
	 		//******************************************************************
			//添加HOP把合并和数据同步的步骤关联
			transMeta.addTransHop(new TransHopMeta(mergeStepMeta,synStepMeta));
			//******************************************************************
			
			String transXml = transMeta.getXML();
			System.out.println("transXml:"+transXml);
			
		    Trans trans = new Trans(transMeta);

		    trans.execute(null); // You can pass arguments instead of null.
		    trans.waitUntilFinished();
		    if ( trans.getErrors() > 0 )
		    {
		      throw new RuntimeException( "There were errors during transformation execution." );
		    }
		    System.out.println("***********the end************");
		} catch (Exception e) {
			e.printStackTrace();
			return;
		}
		
	}

 执行结果

2015/04/18 17:04:18 - 转换名称 - 为了转换解除补丁开始  [转换名称]
2015/04/18 17:04:20 - 合并记录.0 - 行号50000
2015/04/18 17:04:21 - INPUTTABLE_syonline.0 - linenr 50000
2015/04/18 17:04:21 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 50000
2015/04/18 17:04:21 - 合并记录.0 - 行号100000
2015/04/18 17:04:21 - 合并记录.0 - 行号150000
2015/04/18 17:04:21 - INPUTTABLE_syonline.0 - linenr 100000
2015/04/18 17:04:21 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 100000
2015/04/18 17:04:21 - 合并记录.0 - 行号200000
2015/04/18 17:04:21 - 合并记录.0 - 行号250000
2015/04/18 17:04:22 - INPUTTABLE_syonline.0 - linenr 150000
2015/04/18 17:04:22 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 150000
2015/04/18 17:04:22 - 合并记录.0 - 行号300000
2015/04/18 17:04:22 - 合并记录.0 - 行号350000
2015/04/18 17:04:22 - INPUTTABLE_syonline.0 - linenr 200000
2015/04/18 17:04:22 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 200000
2015/04/18 17:04:22 - 合并记录.0 - 行号400000
2015/04/18 17:04:23 - 合并记录.0 - 行号450000
2015/04/18 17:04:23 - INPUTTABLE_syonline.0 - linenr 250000
2015/04/18 17:04:23 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 250000
2015/04/18 17:04:23 - 合并记录.0 - 行号500000
2015/04/18 17:04:23 - 合并记录.0 - 行号550000
2015/04/18 17:04:23 - INPUTTABLE_syonline.0 - linenr 300000
2015/04/18 17:04:24 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 300000
2015/04/18 17:04:24 - 合并记录.0 - 行号600000
2015/04/18 17:04:24 - 合并记录.0 - 行号650000
2015/04/18 17:04:24 - INPUTTABLE_syonline.0 - linenr 350000
2015/04/18 17:04:24 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 350000
2015/04/18 17:04:24 - 合并记录.0 - 行号700000
2015/04/18 17:04:24 - 合并记录.0 - 行号750000
2015/04/18 17:04:25 - INPUTTABLE_syonline.0 - linenr 400000
2015/04/18 17:04:25 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 400000
2015/04/18 17:04:25 - 合并记录.0 - 行号800000
2015/04/18 17:04:25 - 合并记录.0 - 行号850000
2015/04/18 17:04:25 - INPUTTABLE_syonline.0 - linenr 450000
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 450000
2015/04/18 17:04:26 - 合并记录.0 - 行号900000
2015/04/18 17:04:26 - 合并记录.0 - 行号950000
2015/04/18 17:04:26 - INPUTTABLE_syonline.0 - linenr 500000
2015/04/18 17:04:26 - INPUTTABLE_syonline.0 - Finished reading query, closing connection.
2015/04/18 17:04:26 - INPUTTABLE_syonline.0 - 完成处理 (I=500184, O=0, R=0, W=500184, U=0, E=0
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - linenr 500000
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - Finished reading query, closing connection.
2015/04/18 17:04:26 - 合并记录.0 - 行号1000000
2015/04/18 17:04:26 - INPUTTABLE_t_lzfx_base_syonline.0 - 完成处理 (I=500184, O=0, R=0, W=500184, U=0, E=0
2015/04/18 17:04:26 - 合并记录.0 - 完成处理 (I=0, O=0, R=1000368, W=500184, U=0, E=0
2015/04/18 17:04:26 - 数据同步.0 - 完成处理 (I=0, O=0, R=500184, W=500184, U=0, E=0

 

分享到:
评论
2 楼 x0703010 2018-10-24  
问一下,有这方面的文档吗?我现在有一个这样的需求,从a,b表中各取一些字段插入c表,这个怎么处理?有文档最好,没有给个demo也行,谢谢
1 楼 cl4coolboy 2017-07-28  
哥们给份源码吧915999136@qq.com

相关推荐

Global site tag (gtag.js) - Google Analytics