搜档网
当前位置:搜档网 › Spark入门程序参考

Spark入门程序参考

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import https://www.sodocs.net/doc/c212526832.html,.URL;
import https://www.sodocs.net/doc/c212526832.html,.URLConnection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import https://www.sodocs.net/doc/c212526832.html,parator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.sysFuncNames_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.catalyst.expressions.Count;

import scala.Serializable;
import scala.Tuple2;

import breeze.io.CSVReader;

import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;


public class One_SparkTest implements Serializable {


public static void main(String[] args) {
// Collections.sort(list , new Comparator() {
//
// @Override
// public int compare(String o1, String o2) {
// int a1= Integer.valueOf(o1);
// int a2= Integer.valueOf(o2);
// if(a1>a2)return 5;
// return 0;
// }
//
// });
/**
* 每当物化RDD的时候,它将要重新计算,如果打算经常使用一些数据,利用cache函数来
* 提供性能
*/
new One_SparkTest().test5(args);
// System.out.println(getTitle("https://www.sodocs.net/doc/c212526832.html,"));
}

public void test1(){
/**
* 一旦创建SparkContext实例,它将提供Spark的入口。SparkContext实例
* 如何加载和保存数据,如何使用它去提交更多的作业,添加和删除依赖。
*/
String master = System.getenv("MASTER");
if(master!=null){
master ="local";
}
String sparkHome=System.getenv("SPARK_HOME");

if(sparkHome != null){
sparkHome ="./";
}

String jars = System.getenv("JARS");
JavaSparkContext ctx = new JavaSparkContext(
master,
"My Java app",
sparkHome,
jars
);

}
public void test2(String []args){
if(args.length != 2){
System.out.println("Usage:JavaLoadCSVCounter");
System.exit(0);
}
String master = args[0];
String inputFile = args[1];

JavaSparkContext sc = new JavaSparkContext(
master,
"java load csv with counters",
System.getenv("Spark_HOme"),
System.getenv("jars")
);

JavaRDD inFile = sc.textFile(inputFile);
JavaRDD splitLines = inFile.flatMap(new FlatMapFunction(
) {

@Override
public Iterable call(String line)
throws Exception {
ArrayList

result =new ArrayList();
try {
String[] parseLine = line.split(" ");
for (int i = 0; i < parseLine.length; i++) {
result.add(Integer.valueOf(parseLine[i]));
}

} catch (Exception e) {
// TODO: handle exception
}
return (Iterable) result.iterator();
}
});
}

private static final Pattern SPACE = https://www.sodocs.net/doc/c212526832.html,pile(" ");
private static final Pattern splitBY_ = https://www.sodocs.net/doc/c212526832.html,pile("_");
public void wordCount(String[]args){
if (args.length < 1) {
System.err.println("Usage: JavaWordCount ");
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD lines = ctx.textFile(args[0], 1);

JavaRDD words = lines.flatMap(new FlatMapFunction() {
@Override
public Iterable call(String s) {
return Arrays.asList(SPACE.split(s));
}
});

JavaPairRDD ones = words.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String s) {

return new Tuple2(s, 1);
}
});

JavaPairRDD counts = ones.reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

JavaPairRDD result= counts.sortByKey(new Comparator() {

@Override
public int compare(String o1, String o2) {
if(Integer.valueOf(o1)>Integer.valueOf(o2))return -1;
return 0;
}
});


List> output = counts.collect();
for (Tuple2 tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
}
public void test5(String [] args){
if (args.length < 1) {
System.err.println("Usage: JavaWordCount ");
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD lines = ctx.textFile(args[0], 1);
//读取到每行

//分解出每一行的 ip+url 以及 title (一对一)

//写入到HDFS
// JavaRDD words = lines.flatMap(new FlatMapFunction() {
// @Override
// public Iterable call(String s) {
// return Arrays.asList(SPACE.split(s));
// }
// });
JavaRDD guolv = lines.map(new Function(){

@Override
public String call(String line) throws Exce

ption {

if(line.startsWith("#"))return null;
//String[] data = SPACE.split(line);
String [] data = line.split(" ");


if(data[13].endsWith(".swf")|| data[13].endsWith(".gif")||data[13].endsWith(".jpg")||data[13].endsWith(".png")
|| data[13].endsWith(".bmp")|| data[13].endsWith(".jpeg")|| data[13].endsWith(".css")
|| data[13].endsWith(".jpg-")|| data[13].endsWith(".js")|| data[13].endsWith(".mtk")|| data[13].endsWith(".flv")
|| data[13].endsWith(".rm")|| data[13].endsWith(".rmvb")|| data[13].endsWith(".avi")){
return null;
}

StringBuilder sb = new StringBuilder();
sb.append(data[4]).append("_").append(data[10]).append("://").append(data[11]).append(":").append(data[12]).append(data[13]);


//word.set(sb.toString());
//output.collect(word, one);
return sb.toString();
}

});

JavaPairRDD ones = guolv.mapToPair(new PairFunction() {
@Override
public Tuple2 call(String s) {

return new Tuple2(s, 1);
}
});

JavaPairRDD counts = ones.reduceByKey(new Function2() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

JavaPairRDD result =counts.mapToPair(new PairFunction,String,Integer>() {

@Override
public Tuple2 call(Tuple2 arg0)
throws Exception {
try {
String[] datas =splitBY_.split(arg0._1());
} catch (Exception e) {
return arg0;
}

return new Tuple2(arg0._1()+"_"+getTitle("https://www.sodocs.net/doc/c212526832.html,"), arg0._2());
}
});
List> list = result.collect();

for (int i = 0; i < list.size(); i++) {
try {
//String[] datas = splitBY_.split(list.get(i)._1());
// System.out.println(list.get(i)._1()+"_"+getTitle(datas[1])+"_"+list.get(i)._2());

System.out.println(list.get(i)._1()+"_"+list.get(i)._2());
} catch (Exception e) {
//e.printStackTrace();
continue;
}
}


//将RDD物理化到HDFS
//result.collect();
//result.saveAsTextFile (args[1]);
System.out.println("结束了,请查看相应的目录");

// List> output = counts.collect();
// for (Tuple2 tuple : output) {
// System.out.println(tuple._1() + ": " + tuple._2());
// }
// ctx.stop();
}
public static String getTitle(String URL){
try {
URL url = new URL(URL);

URLConnection urlcon = url.openConnection();
InputStream is = urlcon.getInputStream();
InputStreamReader isr = new InputStreamReader(is,"GBK");
StringBuilder sb = new StringBuilder();
BufferedReader br = new BufferedReader(isr);
String tmps = null;
int start =-1;
int end =-1;
int startz= -1;
while(( tmps = br.readLine())!=null){
if(startz>=0){
sb.append(tmps);
}else{
//开始找title
start = tmps.indexOf("");<br>end = tmps.indexOf("/title>");<br>// System.out.println(tmps);<br>if(start>=0){<br>//找到<br>if(end>=0){<br>return tmps.substring(start+7,end-1);//防止乱码<br>}<br>sb.append(tmps);<br>startz= start;<br>continue;<br>}//没找到<br>continue;<br>}<br>//找 /title<br>end = sb.indexOf("/title>");<br>if(end>=0){//找到<br>startz= sb.indexOf("<title>")+7;<br>return sb.substring(startz,end-1);<br>}<br>//没找到<br><br>}<br>} catch (Exception e) {<br>e.printStackTrace();<br>return null;<br>}<br>return "";<br>}<br><br>}<br><br></p><!--/p4--> <div> <div>相关主题</div> <div class="relatedtopic"> <div id="tabs-section" class="tabs"> <ul class="tab-head"> <li id="16089908"><a href="/topic/16089908/" target="_blank">spark入门</a></li> <li id="18206522"><a href="/topic/18206522/" target="_blank">spark开发入门教程</a></li> <li id="16939412"><a href="/topic/16939412/" target="_blank">spark入门及实践</a></li> </ul> </div> </div> </div> </div> <div class="category"> <h2 class="navname">相关文档</h2> <ul class="lista"> <li><a href="/doc/4e1350178.html" target="_blank">用Apache Spark进行大数据处理——第一部分:入门介绍</a></li> <li><a href="/doc/866755056.html" target="_blank">《Spark大数据编程基础(Scala版)》第三章 Scala语言基础</a></li> <li><a href="/doc/a83086534.html" target="_blank">spark入门及实践—光环大数据培训</a></li> <li><a href="/doc/c514115816.html" target="_blank">spark入门及实践</a></li> <li><a href="/doc/003571464.html" target="_blank">SCALA与SPARK编程基础</a></li> <li><a href="/doc/eb12169786.html" target="_blank">SPARK 大数据处理引擎(初级)</a></li> <li><a href="/doc/6d2796935.html" target="_blank">大数据处理平台Spark基础实践研究</a></li> <li><a href="/doc/9e1675363.html" target="_blank">大数据技术和应用基础-教学大纲</a></li> <li><a href="/doc/a614651748.html" target="_blank">大数据技术基础第九章:Spark Streaming编程</a></li> <li><a href="/doc/e21655649.html" target="_blank">Spark入门(Python)</a></li> <li><a href="/doc/46744312.html" target="_blank">大数据开发新手学习指南(经典)</a></li> <li><a href="/doc/762394180.html" target="_blank">spark入门教程及经验总结</a></li> <li><a href="/doc/9a18520668.html" target="_blank">《Spark大数据编程基础(Scala版)》第七章 Spark Streaming</a></li> <li><a href="/doc/b412604625.html" target="_blank">《Spark大数据编程基础(Scala版)》第五章 RDD编程</a></li> <li><a href="/doc/e111820395.html" target="_blank">Spark MLlib基础入门</a></li> <li><a href="/doc/5110723950.html" target="_blank">10-Pyspark启动与日志设置</a></li> <li><a href="/doc/8013632377.html" target="_blank">Spark入门实战系列</a></li> <li><a href="/doc/ae8293318.html" target="_blank">spark GraphX 图计算 介绍 教程 入门 手册 调研</a></li> <li><a href="/doc/d99013401.html" target="_blank">大数据 Spark编程基础(Scala版)-第1章-大数据技术概述</a></li> <li><a href="/doc/3f5340826.html" target="_blank">大数据技术与应用基础_教学大纲</a></li> </ul> <h2 class="navname">最新文档</h2> <ul class="lista"> <li><a href="/doc/0919509601.html" target="_blank">幼儿园小班科学《小动物过冬》PPT课件教案</a></li> <li><a href="/doc/0d19509602.html" target="_blank">2021年春新青岛版(五四制)科学四年级下册 20.《露和霜》教学课件</a></li> <li><a href="/doc/9419184372.html" target="_blank">自然教育课件</a></li> <li><a href="/doc/3c19258759.html" target="_blank">小学语文优质课火烧云教材分析及课件</a></li> <li><a href="/doc/d619211938.html" target="_blank">(超详)高中语文知识点归纳汇总</a></li> <li><a href="/doc/a219240639.html" target="_blank">高中语文基础知识点总结(5篇)</a></li> <li><a href="/doc/9e19184371.html" target="_blank">高中语文基础知识点总结(最新)</a></li> <li><a href="/doc/8f19195909.html" target="_blank">高中语文知识点整理总结</a></li> <li><a href="/doc/8619195910.html" target="_blank">高中语文知识点归纳</a></li> <li><a href="/doc/7819336998.html" target="_blank">高中语文基础知识点总结大全</a></li> <li><a href="/doc/7b19336999.html" target="_blank">超详细的高中语文知识点归纳</a></li> <li><a href="/doc/6a19035160.html" target="_blank">高考语文知识点总结高中</a></li> <li><a href="/doc/6719035161.html" target="_blank">高中语文知识点总结归纳</a></li> <li><a href="/doc/4b19232289.html" target="_blank">高中语文知识点整理总结</a></li> <li><a href="/doc/3d19258758.html" target="_blank">高中语文知识点归纳</a></li> <li><a href="/doc/2919396978.html" target="_blank">高中语文知识点归纳(大全)</a></li> <li><a href="/doc/2819396979.html" target="_blank">高中语文知识点总结归纳(汇总8篇)</a></li> <li><a href="/doc/1219338136.html" target="_blank">高中语文基础知识点整理</a></li> <li><a href="/doc/e819066069.html" target="_blank">化工厂应急预案</a></li> <li><a href="/doc/b819159069.html" target="_blank">化工消防应急预案(精选8篇)</a></li> </ul> </div> </div> <script> var sdocid = "ca87a79204a1b0717fd5dddd"; </script> <div class="footer"> <p>© 2013-2022 www.sodocs.net  <a href="/sitemap.html">站点地图</a> | <a href="/tousu.html" target="_blank">侵权投诉</a></p> <p><a href="https://beian.miit.gov.cn/">闽ICP备11023808号-8</a>  本站资源均为网友上传分享,本站仅负责收集和整理,有任何问题请在对应网页下方投诉通道反馈<script type="text/javascript">tj();</script></p> </div> </div> </body> </html>