import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import https://www.sodocs.net/doc/c212526832.html,.URL;
import https://www.sodocs.net/doc/c212526832.html,.URLConnection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import https://www.sodocs.net/doc/c212526832.html,parator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.sysFuncNames_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.catalyst.expressions.Count;
import scala.Serializable;
import scala.Tuple2;
import breeze.io.CSVReader;
import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
public class One_SparkTest implements Serializable {
public static void main(String[] args) {
// Collections.sort(list , new Comparator
//
// @Override
// public int compare(String o1, String o2) {
// int a1= Integer.valueOf(o1);
// int a2= Integer.valueOf(o2);
// if(a1>a2)return 5;
// return 0;
// }
//
// });
/**
* 每当物化RDD的时候,它将要重新计算,如果打算经常使用一些数据,利用cache函数来
* 提供性能
*/
new One_SparkTest().test5(args);
// System.out.println(getTitle("https://www.sodocs.net/doc/c212526832.html,"));
}
public void test1(){
/**
* 一旦创建SparkContext实例,它将提供Spark的入口。SparkContext实例
* 如何加载和保存数据,如何使用它去提交更多的作业,添加和删除依赖。
*/
String master = System.getenv("MASTER");
if(master!=null){
master ="local";
}
String sparkHome=System.getenv("SPARK_HOME");
if(sparkHome != null){
sparkHome ="./";
}
String jars = System.getenv("JARS");
JavaSparkContext ctx = new JavaSparkContext(
master,
"My Java app",
sparkHome,
jars
);
}
public void test2(String []args){
if(args.length != 2){
System.out.println("Usage:JavaLoadCSVCounter");
System.exit(0);
}
String master = args[0];
String inputFile = args[1];
JavaSparkContext sc = new JavaSparkContext(
master,
"java load csv with counters",
System.getenv("Spark_HOme"),
System.getenv("jars")
);
JavaRDD
JavaRDD
) {
@Override
public Iterable
throws Exception {
ArrayList
try {
String[] parseLine = line.split(" ");
for (int i = 0; i < parseLine.length; i++) {
result.add(Integer.valueOf(parseLine[i]));
}
} catch (Exception e) {
// TODO: handle exception
}
return (Iterable
}
});
}
private static final Pattern SPACE = https://www.sodocs.net/doc/c212526832.html,pile(" ");
private static final Pattern splitBY_ = https://www.sodocs.net/doc/c212526832.html,pile("_");
public void wordCount(String[]args){
if (args.length < 1) {
System.err.println("Usage: JavaWordCount
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD
JavaRDD
@Override
public Iterable
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD
@Override
public Tuple2
return new Tuple2
}
});
JavaPairRDD
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
JavaPairRDD
@Override
public int compare(String o1, String o2) {
if(Integer.valueOf(o1)>Integer.valueOf(o2))return -1;
return 0;
}
});
List
for (Tuple2, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
ctx.stop();
}
public void test5(String [] args){
if (args.length < 1) {
System.err.println("Usage: JavaWordCount
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD
//读取到每行
//分解出每一行的 ip+url 以及 title (一对一)
//写入到HDFS
// JavaRDD
// @Override
// public Iterable
// return Arrays.asList(SPACE.split(s));
// }
// });
JavaRDD
@Override
public String call(String line) throws Exce
ption {
if(line.startsWith("#"))return null;
//String[] data = SPACE.split(line);
String [] data = line.split(" ");
if(data[13].endsWith(".swf")|| data[13].endsWith(".gif")||data[13].endsWith(".jpg")||data[13].endsWith(".png")
|| data[13].endsWith(".bmp")|| data[13].endsWith(".jpeg")|| data[13].endsWith(".css")
|| data[13].endsWith(".jpg-")|| data[13].endsWith(".js")|| data[13].endsWith(".mtk")|| data[13].endsWith(".flv")
|| data[13].endsWith(".rm")|| data[13].endsWith(".rmvb")|| data[13].endsWith(".avi")){
return null;
}
StringBuilder sb = new StringBuilder();
sb.append(data[4]).append("_").append(data[10]).append("://").append(data[11]).append(":").append(data[12]).append(data[13]);
//word.set(sb.toString());
//output.collect(word, one);
return sb.toString();
}
});
JavaPairRDD
@Override
public Tuple2
return new Tuple2
}
});
JavaPairRDD
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
JavaPairRDD
@Override
public Tuple2
throws Exception {
try {
String[] datas =splitBY_.split(arg0._1());
} catch (Exception e) {
return arg0;
}
return new Tuple2
}
});
List
for (int i = 0; i < list.size(); i++) {
try {
//String[] datas = splitBY_.split(list.get(i)._1());
// System.out.println(list.get(i)._1()+"_"+getTitle(datas[1])+"_"+list.get(i)._2());
System.out.println(list.get(i)._1()+"_"+list.get(i)._2());
} catch (Exception e) {
//e.printStackTrace();
continue;
}
}
//将RDD物理化到HDFS
//result.collect();
//result.saveAsTextFile (args[1]);
System.out.println("结束了,请查看相应的目录");
// List
// for (Tuple2, ?> tuple : output) {
// System.out.println(tuple._1() + ": " + tuple._2());
// }
// ctx.stop();
}
public static String getTitle(String URL){
try {
URL url = new URL(URL);
URLConnection urlcon = url.openConnection();
InputStream is = urlcon.getInputStream();
InputStreamReader isr = new InputStreamReader(is,"GBK");
StringBuilder sb = new StringBuilder();
BufferedReader br = new BufferedReader(isr);
String tmps = null;
int start =-1;
int end =-1;
int startz= -1;
while(( tmps = br.readLine())!=null){
if(startz>=0){
sb.append(tmps);
}else{
//开始找title
start = tmps.indexOf("