001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.lang3.stream; 018 019import java.lang.reflect.Array; 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Collections; 023import java.util.List; 024import java.util.Set; 025import java.util.function.BiConsumer; 026import java.util.function.BinaryOperator; 027import java.util.function.Consumer; 028import java.util.function.Function; 029import java.util.function.Predicate; 030import java.util.function.Supplier; 031import java.util.stream.Collector; 032import java.util.stream.Collectors; 033import java.util.stream.Stream; 034 035import org.apache.commons.lang3.function.Failable; 036import org.apache.commons.lang3.function.FailableConsumer; 037import org.apache.commons.lang3.function.FailableFunction; 038import org.apache.commons.lang3.function.FailablePredicate; 039 040/** 041 * Provides utility functions, and classes for working with the 042 * {@code java.util.stream} package, or more generally, with Java 8 lambdas. More 043 * specifically, it attempts to address the fact that lambdas are supposed 044 * not to throw Exceptions, at least not checked Exceptions, AKA instances 045 * of {@link Exception}. This enforces the use of constructs like 046 * <pre> 047 * Consumer<java.lang.reflect.Method> consumer = m -> { 048 * try { 049 * m.invoke(o, args); 050 * } catch (Throwable t) { 051 * throw Failable.rethrow(t); 052 * } 053 * }; 054 * stream.forEach(consumer); 055 * </pre> 056 * Using a {@link FailableStream}, this can be rewritten as follows: 057 * <pre> 058 * Streams.failable(stream).forEach((m) -> m.invoke(o, args)); 059 * </pre> 060 * Obviously, the second version is much more concise and the spirit of 061 * Lambda expressions is met better than in the first version. 062 * 063 * @see Stream 064 * @see Failable 065 * @since 3.11 066 */ 067public class Streams { 068 069 /** 070 * A Collector type for arrays. 071 * 072 * @param <O> The array type. 073 */ 074 public static class ArrayCollector<O> implements Collector<O, List<O>, O[]> { 075 private static final Set<Characteristics> characteristics = Collections.emptySet(); 076 private final Class<O> elementType; 077 078 /** 079 * Constructs a new instance for the given element type. 080 * 081 * @param elementType The element type. 082 */ 083 public ArrayCollector(final Class<O> elementType) { 084 this.elementType = elementType; 085 } 086 087 @Override 088 public BiConsumer<List<O>, O> accumulator() { 089 return List::add; 090 } 091 092 @Override 093 public Set<Characteristics> characteristics() { 094 return characteristics; 095 } 096 097 @Override 098 public BinaryOperator<List<O>> combiner() { 099 return (left, right) -> { 100 left.addAll(right); 101 return left; 102 }; 103 } 104 105 @Override 106 public Function<List<O>, O[]> finisher() { 107 return list -> { 108 @SuppressWarnings("unchecked") 109 final O[] array = (O[]) Array.newInstance(elementType, list.size()); 110 return list.toArray(array); 111 }; 112 } 113 114 @Override 115 public Supplier<List<O>> supplier() { 116 return ArrayList::new; 117 } 118 } 119 120 /** 121 * A reduced, and simplified version of a {@link Stream} with failable method signatures. 122 * 123 * @param <O> The streams element type. 124 */ 125 public static class FailableStream<O extends Object> { 126 127 private Stream<O> stream; 128 private boolean terminated; 129 130 /** 131 * Constructs a new instance with the given {@code stream}. 132 * 133 * @param stream The stream. 134 */ 135 public FailableStream(final Stream<O> stream) { 136 this.stream = stream; 137 } 138 139 /** 140 * Returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on 141 * all elements if not necessary for determining the result. If the stream is empty then {@code true} is 142 * returned and the predicate is not evaluated. 143 * 144 * <p> 145 * This is a short-circuiting terminal operation. 146 * 147 * Note This method evaluates the <em>universal quantification</em> of the predicate over the elements of 148 * the stream (for all x P(x)). If the stream is empty, the quantification is said to be <em>vacuously 149 * satisfied</em> and is always {@code true} (regardless of P(x)). 150 * 151 * @param predicate A non-interfering, stateless predicate to apply to elements of this stream 152 * @return {@code true} If either all elements of the stream match the provided predicate or the stream is 153 * empty, otherwise {@code false}. 154 */ 155 public boolean allMatch(final FailablePredicate<O, ?> predicate) { 156 assertNotTerminated(); 157 return stream().allMatch(Failable.asPredicate(predicate)); 158 } 159 160 /** 161 * Returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on 162 * all elements if not necessary for determining the result. If the stream is empty then {@code false} is 163 * returned and the predicate is not evaluated. 164 * 165 * <p> 166 * This is a short-circuiting terminal operation. 167 * 168 * Note This method evaluates the <em>existential quantification</em> of the predicate over the elements of 169 * the stream (for some x P(x)). 170 * 171 * @param predicate A non-interfering, stateless predicate to apply to elements of this stream 172 * @return {@code true} if any elements of the stream match the provided predicate, otherwise {@code false} 173 */ 174 public boolean anyMatch(final FailablePredicate<O, ?> predicate) { 175 assertNotTerminated(); 176 return stream().anyMatch(Failable.asPredicate(predicate)); 177 } 178 179 protected void assertNotTerminated() { 180 if (terminated) { 181 throw new IllegalStateException("This stream is already terminated."); 182 } 183 } 184 185 /** 186 * Performs a mutable reduction operation on the elements of this stream using a {@code Collector}. A 187 * {@code Collector} encapsulates the functions used as arguments to 188 * {@link #collect(Supplier, BiConsumer, BiConsumer)}, allowing for reuse of collection strategies and 189 * composition of collect operations such as multiple-level grouping or partitioning. 190 * 191 * <p> 192 * If the underlying stream is parallel, and the {@code Collector} is concurrent, and either the stream is 193 * unordered or the collector is unordered, then a concurrent reduction will be performed (see {@link Collector} 194 * for details on concurrent reduction.) 195 * 196 * <p> 197 * This is a terminal operation. 198 * 199 * <p> 200 * When executed in parallel, multiple intermediate results may be instantiated, populated, and merged so as to 201 * maintain isolation of mutable data structures. Therefore, even when executed in parallel with non-thread-safe 202 * data structures (such as {@code ArrayList}), no additional synchronization is needed for a parallel 203 * reduction. 204 * 205 * Note The following will accumulate strings into an ArrayList: 206 * 207 * <pre> 208 * {@code 209 * List<String> asList = stringStream.collect(Collectors.toList()); 210 * } 211 * </pre> 212 * 213 * <p> 214 * The following will classify {@code Person} objects by city: 215 * 216 * <pre> 217 * {@code 218 * Map<String, List<Person>> peopleByCity = personStream.collect(Collectors.groupingBy(Person::getCity)); 219 * } 220 * </pre> 221 * 222 * <p> 223 * The following will classify {@code Person} objects by state and city, cascading two {@code Collector}s 224 * together: 225 * 226 * <pre> 227 * {@code 228 * Map<String, Map<String, List<Person>>> peopleByStateAndCity = personStream 229 * .collect(Collectors.groupingBy(Person::getState, Collectors.groupingBy(Person::getCity))); 230 * } 231 * </pre> 232 * 233 * @param <R> the type of the result 234 * @param <A> the intermediate accumulation type of the {@code Collector} 235 * @param collector the {@code Collector} describing the reduction 236 * @return the result of the reduction 237 * @see #collect(Supplier, BiConsumer, BiConsumer) 238 * @see Collectors 239 */ 240 public <A, R> R collect(final Collector<? super O, A, R> collector) { 241 makeTerminated(); 242 return stream().collect(collector); 243 } 244 245 /** 246 * Performs a mutable reduction operation on the elements of this FailableStream. A mutable reduction is one in 247 * which the reduced value is a mutable result container, such as an {@code ArrayList}, and elements are 248 * incorporated by updating the state of the result rather than by replacing the result. This produces a result 249 * equivalent to: 250 * 251 * <pre> 252 * {@code 253 * R result = supplier.get(); 254 * for (T element : this stream) 255 * accumulator.accept(result, element); 256 * return result; 257 * } 258 * </pre> 259 * 260 * <p> 261 * Like {@link #reduce(Object, BinaryOperator)}, {@code collect} operations can be parallelized without 262 * requiring additional synchronization. 263 * 264 * <p> 265 * This is a terminal operation. 266 * 267 * Note There are many existing classes in the JDK whose signatures are well-suited for use with method 268 * references as arguments to {@code collect()}. For example, the following will accumulate strings into an 269 * {@code ArrayList}: 270 * 271 * <pre> 272 * {@code 273 * List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add, ArrayList::addAll); 274 * } 275 * </pre> 276 * 277 * <p> 278 * The following will take a stream of strings and concatenates them into a single string: 279 * 280 * <pre> 281 * {@code 282 * String concat = stringStream.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append) 283 * .toString(); 284 * } 285 * </pre> 286 * 287 * @param <R> type of the result 288 * @param <A> Type of the accumulator. 289 * @param pupplier a function that creates a new result container. For a parallel execution, this function may 290 * be called multiple times and must return a fresh value each time. 291 * @param accumulator An associative, non-interfering, stateless function for incorporating an additional 292 * element into a result 293 * @param combiner An associative, non-interfering, stateless function for combining two values, which must be 294 * compatible with the accumulator function 295 * @return The result of the reduction 296 */ 297 public <A, R> R collect(final Supplier<R> pupplier, final BiConsumer<R, ? super O> accumulator, 298 final BiConsumer<R, R> combiner) { 299 makeTerminated(); 300 return stream().collect(pupplier, accumulator, combiner); 301 } 302 303 /** 304 * Returns a FailableStream consisting of the elements of this stream that match the given FailablePredicate. 305 * 306 * <p> 307 * This is an intermediate operation. 308 * 309 * @param predicate a non-interfering, stateless predicate to apply to each element to determine if it should be 310 * included. 311 * @return the new stream 312 */ 313 public FailableStream<O> filter(final FailablePredicate<O, ?> predicate) { 314 assertNotTerminated(); 315 stream = stream.filter(Failable.asPredicate(predicate)); 316 return this; 317 } 318 319 /** 320 * Performs an action for each element of this stream. 321 * 322 * <p> 323 * This is a terminal operation. 324 * 325 * <p> 326 * The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation 327 * does <em>not</em> guarantee to respect the encounter order of the stream, as doing so would sacrifice the 328 * benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever 329 * thread the library chooses. If the action accesses shared state, it is responsible for providing the required 330 * synchronization. 331 * 332 * @param action a non-interfering action to perform on the elements 333 */ 334 public void forEach(final FailableConsumer<O, ?> action) { 335 makeTerminated(); 336 stream().forEach(Failable.asConsumer(action)); 337 } 338 339 protected void makeTerminated() { 340 assertNotTerminated(); 341 terminated = true; 342 } 343 344 /** 345 * Returns a stream consisting of the results of applying the given function to the elements of this stream. 346 * 347 * <p> 348 * This is an intermediate operation. 349 * 350 * @param <R> The element type of the new stream 351 * @param mapper A non-interfering, stateless function to apply to each element 352 * @return the new stream 353 */ 354 public <R> FailableStream<R> map(final FailableFunction<O, R, ?> mapper) { 355 assertNotTerminated(); 356 return new FailableStream<>(stream.map(Failable.asFunction(mapper))); 357 } 358 359 /** 360 * Performs a reduction on the elements of this stream, using the provided identity value and an associative 361 * accumulation function, and returns the reduced value. This is equivalent to: 362 * 363 * <pre> 364 * {@code 365 * T result = identity; 366 * for (T element : this stream) 367 * result = accumulator.apply(result, element) 368 * return result; 369 * } 370 * </pre> 371 * 372 * but is not constrained to execute sequentially. 373 * 374 * <p> 375 * The {@code identity} value must be an identity for the accumulator function. This means that for all 376 * {@code t}, {@code accumulator.apply(identity, t)} is equal to {@code t}. The {@code accumulator} function 377 * must be an associative function. 378 * 379 * <p> 380 * This is a terminal operation. 381 * 382 * Note Sum, min, max, average, and string concatenation are all special cases of reduction. Summing a 383 * stream of numbers can be expressed as: 384 * 385 * <pre> 386 * {@code 387 * Integer sum = integers.reduce(0, (a, b) -> a + b); 388 * } 389 * </pre> 390 * 391 * or: 392 * 393 * <pre> 394 * {@code 395 * Integer sum = integers.reduce(0, Integer::sum); 396 * } 397 * </pre> 398 * 399 * <p> 400 * While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running 401 * total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization 402 * and with greatly reduced risk of data races. 403 * 404 * @param identity the identity value for the accumulating function 405 * @param accumulator an associative, non-interfering, stateless function for combining two values 406 * @return the result of the reduction 407 */ 408 public O reduce(final O identity, final BinaryOperator<O> accumulator) { 409 makeTerminated(); 410 return stream().reduce(identity, accumulator); 411 } 412 413 /** 414 * Converts the FailableStream into an equivalent stream. 415 * 416 * @return A stream, which will return the same elements, which this FailableStream would return. 417 */ 418 public Stream<O> stream() { 419 return stream; 420 } 421 } 422 423 /** 424 * Converts the given {@link Collection} into a {@link FailableStream}. This is basically a simplified, reduced 425 * version of the {@link Stream} class, with the same underlying element stream, except that failable objects, like 426 * {@link FailablePredicate}, {@link FailableFunction}, or {@link FailableConsumer} may be applied, instead of 427 * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is to rewrite a code snippet like this: 428 * 429 * <pre> 430 * final List<O> list; 431 * final Method m; 432 * final Function<O, String> mapper = (o) -> { 433 * try { 434 * return (String) m.invoke(o); 435 * } catch (Throwable t) { 436 * throw Failable.rethrow(t); 437 * } 438 * }; 439 * final List<String> strList = list.stream().map(mapper).collect(Collectors.toList()); 440 * </pre> 441 * 442 * as follows: 443 * 444 * <pre> 445 * final List<O> list; 446 * final Method m; 447 * final List<String> strList = Failable.stream(list.stream()).map((o) -> (String) m.invoke(o)) 448 * .collect(Collectors.toList()); 449 * </pre> 450 * 451 * While the second version may not be <em>quite</em> as efficient (because it depends on the creation of 452 * additional, intermediate objects, of type FailableStream), it is much more concise, and readable, and meets the 453 * spirit of Lambdas better than the first version. 454 * 455 * @param <O> The streams element type. 456 * @param stream The stream, which is being converted. 457 * @return The {@link FailableStream}, which has been created by converting the stream. 458 */ 459 public static <O> FailableStream<O> stream(final Collection<O> stream) { 460 return stream(stream.stream()); 461 } 462 463 /** 464 * Converts the given {@link Stream stream} into a {@link FailableStream}. This is basically a simplified, reduced 465 * version of the {@link Stream} class, with the same underlying element stream, except that failable objects, like 466 * {@link FailablePredicate}, {@link FailableFunction}, or {@link FailableConsumer} may be applied, instead of 467 * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is to rewrite a code snippet like this: 468 * 469 * <pre> 470 * final List<O> list; 471 * final Method m; 472 * final Function<O, String> mapper = (o) -> { 473 * try { 474 * return (String) m.invoke(o); 475 * } catch (Throwable t) { 476 * throw Failable.rethrow(t); 477 * } 478 * }; 479 * final List<String> strList = list.stream().map(mapper).collect(Collectors.toList()); 480 * </pre> 481 * 482 * as follows: 483 * 484 * <pre> 485 * final List<O> list; 486 * final Method m; 487 * final List<String> strList = Failable.stream(list.stream()).map((o) -> (String) m.invoke(o)) 488 * .collect(Collectors.toList()); 489 * </pre> 490 * 491 * While the second version may not be <em>quite</em> as efficient (because it depends on the creation of 492 * additional, intermediate objects, of type FailableStream), it is much more concise, and readable, and meets the 493 * spirit of Lambdas better than the first version. 494 * 495 * @param <O> The streams element type. 496 * @param stream The stream, which is being converted. 497 * @return The {@link FailableStream}, which has been created by converting the stream. 498 */ 499 public static <O> FailableStream<O> stream(final Stream<O> stream) { 500 return new FailableStream<>(stream); 501 } 502 503 /** 504 * Returns a {@code Collector} that accumulates the input elements into a new array. 505 * 506 * @param pElementType Type of an element in the array. 507 * @param <O> the type of the input elements 508 * @return a {@code Collector} which collects all the input elements into an array, in encounter order 509 */ 510 public static <O extends Object> Collector<O, ?, O[]> toArray(final Class<O> pElementType) { 511 return new ArrayCollector<>(pElementType); 512 } 513}