demo,利用kettle的api,将一个数据源中的信息导入到另外一个数据源中:[java]view plain copy
1.package https://www.sodocs.net/doc/3a2308136.html,.saidi.job;
2.
3.import https://www.sodocs.net/doc/3a2308136.html,mons.io.FileUtils;
4.import org.pentaho.di.core.KettleEnvironment;
5.import org.pentaho.di.core.database.DatabaseMeta;
6.import org.pentaho.di.core.exception.KettleDatabaseException;
7.import org.pentaho.di.core.exception.KettleXMLException;
8.import org.pentaho.di.core.plugins.PluginRegistry;
9.import org.pentaho.di.core.plugins.StepPluginType;
10.import org.pentaho.di.trans.TransHopMeta;
11.import org.pentaho.di.trans.TransMeta;
12.import org.pentaho.di.trans.step.StepMeta;
13.import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
14.import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
15.
16.import java.io.File;
17.
18./**
19. * Created by 戴桥冰 on 2017/1/16.
20. */
21.public class TransDemo {
22.
23.public static TransDemo transDemo;
24.
25./**
26. * 两个库中的表名
27. */
28.public static String bjdt_tablename = "test1";
29.public static String kettle_tablename = "test2";
30.
31./**
32. * 数据库连接信息,适用于DatabaseMeta其中一个构造器
DatabaseMeta(String xml)
33. */
34.public static final String[] databasesXML = {
35.
36."" +
37."
38."
39."
40."
41."
42."
43."
44."
45."
46."",
47."" +
48."
49."
50."
51."
52."
53."
54."
55."
56."
57.""
58.
59. };
60.
61.public static void main(String[] args) {
62.try {
63. KettleEnvironment.init();
64. transDemo = new TransDemo();
65. TransMeta transMeta = transDemo.generateMyOwnTrans();
66. String transXml = transMeta.getXML();
67. String transName = "etl/update_insert_Trans.ktr";
68. File file = new File(transName);
69. FileUtils.writeStringToFile(file, transXml, "UTF-8");
70. System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"
+databasesXML[1]);
71. } catch (Exception e) {
72. e.printStackTrace();
73.return;
74. }
75. }
76.
77./**
78. * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是
表输入,第二个是表插入与更新操作
79. * @return
80. * @throws KettleXMLException
81. */
82.public TransMeta generateMyOwnTrans() throws KettleXMLException, Kettl
eDatabaseException {
83. System.out.println("************start to generate my own transformat
ion***********");
84. TransMeta transMeta = new TransMeta();
85.//设置转化的名称
86. transMeta.setName("insert_update");
87.//添加转换的数据库连接
88.for (int i=0;i 89. DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]); 90. transMeta.addDatabase(databaseMeta); 91. } 92.//registry是给每个步骤生成一个标识Id用 93. PluginRegistry registry = PluginRegistry.getInstance(); 94.//第一个表输入步骤(TableInputMeta) 95. TableInputMeta tableInput = new TableInputMeta(); 96. String tableInputPluginId = registry.getPluginId(StepPluginType.cla ss, tableInput); 97.//给表输入添加一个DatabaseMeta连接数据库 98. DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt"); 99. tableInput.setDatabaseMeta(database_bjdt); 100. String select_sql = "SELECT name FROM "+bjdt_tablename; 101. tableInput.setSQL(select_sql); 102. 103.//添加TableInputMeta到转换中 104. StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"tab le input",tableInput); 105.//给步骤添加在spoon工具中的显示位置 106. tableInputMetaStep.setDraw(true); 107. tableInputMetaStep.setLocation(100, 100); 108. transMeta.addStep(tableInputMetaStep); 109. 110.//第二个步骤插入与更新 111. InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); 112. String insertUpdateMetaPluginId = registry.getPluginId(StepPluginTy pe.class,insertUpdateMeta); 113.//添加数据库连接 114. DatabaseMeta database_kettle = transMeta.findDatabase("kettle"); 115. insertUpdateMeta.setDatabaseMeta(database_kettle); 116.//设置操作的表 117. insertUpdateMeta.setTableName(kettle_tablename); 118.//设置用来查询的关键字 119. insertUpdateMeta.setKeyLookup(new String[]{"name"}); 120. insertUpdateMeta.setKeyStream(new String[]{"name"}); 121. insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上 122. insertUpdateMeta.setKeyCondition(new String[]{"="}); 123. 124.//设置要更新的字段 125. String[] updatelookup = {"name"} ; 126. 127. String [] updateStream = {"name"}; 128. Boolean[] updateOrNot = {true}; 129. insertUpdateMeta.setUpdateLookup(updatelookup); 130. insertUpdateMeta.setUpdateStream(updateStream); 131. insertUpdateMeta.setUpdate(updateOrNot); 132. String[] lookup = insertUpdateMeta.getUpdateLookup(); 133.//添加步骤到转换中 134. StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update",insertUpdateMeta); 135. insertUpdateStep.setDraw(true); 136. insertUpdateStep.setLocation(250,100); 137. transMeta.addStep(insertUpdateStep); 138.//***************************************************************** * 139. 140.//***************************************************************** * 141. 142.//添加hop把两个步骤关联起来 143. transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertU pdateStep)); 144. System.out.println("***********the end************"); 145.return transMeta; 146. } 147. 148.} 上述操作将会产生一个ktr文件,接下来的操作是对ctr文件进行转换: [java]view plain copy 1.public static void main(String[] args) throws KettleException { 2.//初始化ketlle 3. KettleEnvironment.init(); 4.//创建转换元数据对象 5. TransMeta meta = new TransMeta("etl/update_insert_Trans.ktr"); 6. Trans trans = new Trans(meta); 7. trans.prepareExecution(null); 8. trans.startThreads(); 9. trans.waitUntilFinished(); 10.if(trans.getErrors()!=0){ 11. System.out.println("执行失败!"); 12. } 13. }