概述
我们都知道sql语句可以让我们很方便实现一些需求例如,取TopN、排序、过滤等操作。学过scala的同学应该知道,scala中提供了很多的算子也可以很方便的进行一些数据的处理,java中可能就没那么多算子了,需要自定义去实现,但是现在java8中给我们提供了Stream Api弥补了这里劣势,提供了很多方法,不用sql也可以实现。
StreamApi
- Stream API 三部曲
创建 Stream—> 中间操作(Transform) lazy—>终止操作(action)
创建Steeam的方式
import org.junit.Test;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/* * 一、Stream API 的操作步骤: * * 1. 创建 Stream * * 2. 中间操作(Transform) lazy * * 3. 终止操作(action) * * 其实这个Stream 类似于Spark中的RDD操作首先创建一个Stream(存放数据的容器), * 然后调用各种算子进行transform操作,但这是一个lazy的过程,也就是不执行最后的 * action操作,是不会有结果的.下面通过例子进行学习. */
public class StreamApiTrain {
// 1 创建Stream
@Test
public void test1(){
/** * 方式1: 通过Collection 提供的Stream() ParallelStream() */
List<String> list = Arrays.asList("java","spark","hadoop","scala");
//获取一个顺序流
Stream<String> stream = list.stream();
stream.forEach(System.out::println);
//获取一个并行流
Stream<String> stringStream = list.parallelStream();
stringStream.forEach(System.out::println);
/** * 方式2: 通过 Arrays 中的 stream() 获取一个数组流 */
String [] strings = new String[5];
Stream<String> stream1 = Arrays.stream(strings);
/** * 方式3: 通过 Stream 类中静态方法 of() */
Stream<String> stream2 = Stream.of("java", "spark", "hadoop", "scala");
/** * 方式4: 创建无限流 */
Stream<Integer> iterate = Stream.iterate(0, x -> x + 2)
.limit(10);
iterate.forEach(System.out::println);
Stream<Double> generate = Stream.generate(()->Math.random())
.limit(10);
generate.forEach(System.out::println);
}
Transform(中间操作)
/** * Transform: * 1.filter 过滤 * 2.limit 取Top n * 3.skip(n) —— 跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit(n) 互补 * 4.distinct 去重 * 5.sorted 自然排序 sorted(Comparator com)——定制排序 */
@Test
public void test2(){
List<String> list = Arrays.asList("java","spark","hadoop","scala","hive","zookeeper","kafka","spark","java");
list.stream()
.filter(x->{
System.out.println("过滤操作");
return x.length() > 4;
})
.limit(3)
.forEach(System.out::println);
System.out.println("--------------------");
list.stream().skip(2).forEach(System.out::println);
System.out.println("--------------------");
list.stream().distinct().forEach(System.out::println);
System.out.println("--------------------");
list.stream().sorted().forEach(System.out::println);
System.out.println("--------------------");
list.stream().sorted((x,y)->{
if (x.length() == y.length()){
return x.compareTo(y);
}else {
return Integer.compare(x.length(),y.length());
}
}).forEach(System.out::println);
}
- map vs flatMap
/** * Transform: * map vs flatMap * * map——接收 Lambda , 将元素转换成其他形式或提取信息。接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。 * flatMap——接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流 */
@Test
public void test3(){
List<String> list = Arrays.asList("java spark hadoop scala hive zookeeper kafka spark java");
list.stream()
.map(String::toUpperCase)
.forEach(System.out::println);
System.out.println("--------------------");
Stream<String> stringStream = list.stream().flatMap(x -> Stream.of(x.split(" ")));
stringStream.forEach(System.out::println);
}
- reduce
/** * reduce: 该方法可以传递多个参数(源码中给了很多例子) * * Optional<T> reduce(BinaryOperator<T> accumulator); 传递两个参数:第一个参数是上次函数执行的返回值(也称为中间结果),第二个参数是stream中的元素,这个函数把这两个值相加,得到的和会被赋值给下次执行这个函数的第一个参数 * * T reduce(T identity, BinaryOperator<T> accumulator); identity相当于给一个初始值,在初始值的基础上进行操作 * * <U> U reduce(U identity, * BiFunction<U, ? super T, U> accumulator, * BinaryOperator<U> combiner); * identity: 一个初始化的值;这个初始化的值其类型是泛型U,与Reduce方法返回的类型一致 * accumulator: 其类型是BiFunction,输入是U与T两个类型的数据,而返回的是U类型; * 也就是说返回的类型与输入的第一个参数类型是一样的,而输入的第二个参数类型与Stream中元素类型是一样的。 * combiner: 其类型是BinaryOperator,支持的是对U类型的对象进行操作; * 第三个参数combiner主要是使用在并行计算的场景下;如果Stream是非并行时,第三个参数实际上是不生效的。 */
List<User> emps = Arrays.asList(
new User(2, "李四", 59, 6666.66),
new User(1, "张三", 18, 9999.99),
new User(3, "王五", 28, 3333.33),
new User(4, "赵六", 8, 7777.77),
new User(1, "张三", 18, 9999.99),
new User(4, "赵六", 8, 7777.77),
new User(5, "田七", 38, 5555.55)
);
@Test
public void test4(){
//计算user总人数
Optional<Integer> count = emps.stream()
.map((e) -> 1)
.reduce((x,y)->x+y);
//System.out.println(count.get());
//计算薪水总和
Double totalSalary = emps.stream()
.map(x -> x.getSalary())
.reduce(0.00, (x, y) -> x + y);
//System.out.println(totalSalary);
//测试非并行
Integer reduce = emps.stream()
.map(x -> x.getId())
.reduce(2, ((x1, y1) -> {
System.out.println("x1,y1 "+x1+":"+y1);
System.out.println("x1+y1 "+x1+y1);
return x1+y1;
}), (x2, y2) -> {
System.out.println("x2,y2 "+x2+":"+y2);
System.out.println("x2+y2 "+x2+y2);
return x2+y2;
});
System.out.println(reduce); //22(2+2+1+3+4+1+4+5) 非并行参数三不生效
//测试并行
Integer reduce1 = emps.parallelStream()
.map(x -> x.getId())
.reduce(2, ((x1, y1) -> {
System.out.println("x1,y1 "+x1+":"+y1);
System.out.println("x1+y1 "+x1+y1);
return x1+y1;
}), (x2, y2) -> {
System.out.println("x2,y2 "+x2+":"+y2);
System.out.println("x2+y2 "+x2+y2);
return x2+y2;
});
System.out.println(reduce1); //34 = (2+2)+(2+1)+(2+3)+(2+4)+(2+1)+(2+4)+(2+5) 并行参数三生效
}
@Test
public void test5() {
List<String> list = Arrays.asList("hello","hadoop","hive","hadoop","hadoop","hello");
Map<String, List<String>> collect = list.stream().collect(Collectors.groupingBy((x)->x));
System.out.println(collect.values());
Stream<Map<String,Integer>> stream = collect.values()
.stream()
.map((x)->{
Map<String, Integer> map=new HashMap<>();
map.put(x.get(0), x.size());return map;
});
stream.forEach(System.out::println);
}
action操作
/** * action操作: * allMatch——检查是否匹配所有元素 * anyMatch——检查是否至少匹配一个元素 * noneMatch——检查是否没有匹配的元素 * findFirst——返回第一个元素 * findAny——返回当前流中的任意元素 * count——返回流中元素的总个数 * max——返回流中最大值 * min——返回流中最小值 * forEach-遍历元素 */
@Test
public void test6(){
List<String> list = Arrays.asList("java","spark","hadoop","scala","hive","zookeeper","kafka","spark","java");
boolean b = list.stream()
.allMatch(x -> x.length() > 3); //判断是否所以元素长度都大于3
System.out.println(b);
boolean b1 = list.stream()
.anyMatch(x -> x.length() > 4); //判断至少有一个元素长度大于4
System.out.println(b1);
Optional<String> first = list.stream()
.findFirst();
System.out.println(first.get());
Optional<String> any = list.stream()
.findAny();
System.out.println(any.get());
long count = list.stream().count();
System.out.println(count);
//注意:流进行了终止操作后,不能再次使用
}
/** * action操作: * collect--将流转换为其他形式。接收一个 Collector接口的实现,用于给Stream中元素做汇总的方法 * collect方法中需要传递一个Collector,而Collectors类可以实现(源码中提供了很多例子) * Collectors类中提供了很多方法进行操作. */
@Test
public void test7(){
//取user中的名字放入list中
List<String> collect = emps.stream()
.map(User::getName)
.collect(Collectors.toList());
collect.forEach(System.out::println);
//根据id进行分组
Map<Integer, List<User>> collect1 = emps.stream()
.collect(Collectors.groupingBy(User::getId));
collect1.forEach((key,value)-> System.out.println(key+":"+value));
//根据薪水和年龄进行分区
Map<Boolean, List<User>> collect2 = emps.stream()
.collect(Collectors.partitioningBy(x -> (x.getSalary() > 6000) && x.getAge()<30));
collect2.forEach((key,value)-> System.out.println(key+":"+value));
//取User中名字进行joining操作
String collect3 = emps.stream().map(User::getName).collect(Collectors.joining(","));
System.out.println(collect3);
}
}
今天的文章java8新特性—StreamApi分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/82783.html