搜档网
当前位置:搜档网 › ETL之kettle进行二次开发简单demo

ETL之kettle进行二次开发简单demo

ETL之kettle进行二次开发简单demo
ETL之kettle进行二次开发简单demo

demo,利用kettle的api,将一个数据源中的信息导入到另外一个数据源中:[java]view plain copy

1.package https://www.sodocs.net/doc/3a2308136.html,.saidi.job;

2.

3.import https://www.sodocs.net/doc/3a2308136.html,mons.io.FileUtils;

4.import org.pentaho.di.core.KettleEnvironment;

5.import org.pentaho.di.core.database.DatabaseMeta;

6.import org.pentaho.di.core.exception.KettleDatabaseException;

7.import org.pentaho.di.core.exception.KettleXMLException;

8.import org.pentaho.di.core.plugins.PluginRegistry;

9.import org.pentaho.di.core.plugins.StepPluginType;

10.import org.pentaho.di.trans.TransHopMeta;

11.import org.pentaho.di.trans.TransMeta;

12.import org.pentaho.di.trans.step.StepMeta;

13.import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;

14.import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

15.

16.import java.io.File;

17.

18./**

19. * Created by 戴桥冰 on 2017/1/16.

20. */

21.public class TransDemo {

22.

23.public static TransDemo transDemo;

24.

25./**

26. * 两个库中的表名

27. */

28.public static String bjdt_tablename = "test1";

29.public static String kettle_tablename = "test2";

30.

31./**

32. * 数据库连接信息,适用于DatabaseMeta其中一个构造器

DatabaseMeta(String xml)

33. */

34.public static final String[] databasesXML = {

35.

36."" +

37."" +

38."bjdt" +

39."192.168.1.122" +

40."Mysql" +

41."Native" +

42."daiqiaobing" +

43."3306" +

44."root" +

45."root" +

46."",

47."" +

48."" +

49."kettle" +

50."192.168.1.122" +

51."Mysql" +

52."Native" +

53."daiqiaobing" +

54."3306" +

55."root" +

56."root" +

57.""

58.

59. };

60.

61.public static void main(String[] args) {

62.try {

63. KettleEnvironment.init();

64. transDemo = new TransDemo();

65. TransMeta transMeta = transDemo.generateMyOwnTrans();

66. String transXml = transMeta.getXML();

67. String transName = "etl/update_insert_Trans.ktr";

68. File file = new File(transName);

69. FileUtils.writeStringToFile(file, transXml, "UTF-8");

70. System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"

+databasesXML[1]);

71. } catch (Exception e) {

72. e.printStackTrace();

73.return;

74. }

75. }

76.

77./**

78. * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是

表输入,第二个是表插入与更新操作

79. * @return

80. * @throws KettleXMLException

81. */

82.public TransMeta generateMyOwnTrans() throws KettleXMLException, Kettl

eDatabaseException {

83. System.out.println("************start to generate my own transformat

ion***********");

84. TransMeta transMeta = new TransMeta();

85.//设置转化的名称

86. transMeta.setName("insert_update");

87.//添加转换的数据库连接

88.for (int i=0;i

89. DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);

90. transMeta.addDatabase(databaseMeta);

91. }

92.//registry是给每个步骤生成一个标识Id用

93. PluginRegistry registry = PluginRegistry.getInstance();

94.//第一个表输入步骤(TableInputMeta)

95. TableInputMeta tableInput = new TableInputMeta();

96. String tableInputPluginId = registry.getPluginId(StepPluginType.cla

ss, tableInput);

97.//给表输入添加一个DatabaseMeta连接数据库

98. DatabaseMeta database_bjdt = transMeta.findDatabase("bjdt");

99. tableInput.setDatabaseMeta(database_bjdt);

100. String select_sql = "SELECT name FROM "+bjdt_tablename;

101. tableInput.setSQL(select_sql);

102.

103.//添加TableInputMeta到转换中

104. StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId,"tab le input",tableInput);

105.//给步骤添加在spoon工具中的显示位置

106. tableInputMetaStep.setDraw(true);

107. tableInputMetaStep.setLocation(100, 100);

108. transMeta.addStep(tableInputMetaStep);

109.

110.//第二个步骤插入与更新

111. InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); 112. String insertUpdateMetaPluginId = registry.getPluginId(StepPluginTy pe.class,insertUpdateMeta);

113.//添加数据库连接

114. DatabaseMeta database_kettle = transMeta.findDatabase("kettle"); 115. insertUpdateMeta.setDatabaseMeta(database_kettle);

116.//设置操作的表

117. insertUpdateMeta.setTableName(kettle_tablename);

118.//设置用来查询的关键字

119. insertUpdateMeta.setKeyLookup(new String[]{"name"});

120. insertUpdateMeta.setKeyStream(new String[]{"name"});

121. insertUpdateMeta.setKeyStream2(new String[]{""});//一定要加上

122. insertUpdateMeta.setKeyCondition(new String[]{"="});

123.

124.//设置要更新的字段

125. String[] updatelookup = {"name"} ;

126.

127. String [] updateStream = {"name"};

128. Boolean[] updateOrNot = {true};

129. insertUpdateMeta.setUpdateLookup(updatelookup);

130. insertUpdateMeta.setUpdateStream(updateStream);

131. insertUpdateMeta.setUpdate(updateOrNot);

132. String[] lookup = insertUpdateMeta.getUpdateLookup();

133.//添加步骤到转换中

134. StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update",insertUpdateMeta);

135. insertUpdateStep.setDraw(true);

136. insertUpdateStep.setLocation(250,100);

137. transMeta.addStep(insertUpdateStep);

138.//***************************************************************** *

139.

140.//***************************************************************** *

141.

142.//添加hop把两个步骤关联起来

143. transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertU pdateStep));

144. System.out.println("***********the end************");

145.return transMeta;

146. }

147.

148.}

上述操作将会产生一个ktr文件,接下来的操作是对ctr文件进行转换:

[java]view plain copy

1.public static void main(String[] args) throws KettleException {

2.//初始化ketlle

3. KettleEnvironment.init();

4.//创建转换元数据对象

5. TransMeta meta = new TransMeta("etl/update_insert_Trans.ktr");

6. Trans trans = new Trans(meta);

7. trans.prepareExecution(null);

8. trans.startThreads();

9. trans.waitUntilFinished();

10.if(trans.getErrors()!=0){

11. System.out.println("执行失败!");

12. }

13. }

相关主题