The Author Online Book Forums are Moving

The Author Online Book Forums will soon redirect to Manning's liveBook and liveVideo. All book forum content will migrate to liveBook's discussion forum and all video forum content will migrate to liveVideo. Log in to liveBook or liveVideo with your Manning credentials to join the discussion!

Thank you for your engagement in the AoF over the years! We look forward to offering you a more enhanced forum experience.

cemsultan (19) [Avatar] Offline
#1
Hi Raoul,

I just wanted to share (if you wish) my potentially supplementary code to section 6.4, in which it also has a fluent interface and is used in a similar way. The difference between the 2 is that instead of using Spliterator late-binding feature to go through the original stream sequentially, in which each element is copied to the extra dynamically created queues in accordance to the number of queries submitted, my version here creates a unique stream for each query and spawns it off into a new thread making use of Future class. The internal parallel implementation is not exposed to the client. Running the unique streams for each query in parallel this way, appears to be much faster, on average 17+ times on my laptop (Intel Core i7, Q740@1.73GHz) using the same menu data, but instead populated in ArrayList with the contents duplicated 10,000 times fold. The overhead of creating Paul Sandoz's based API includes (but not limited to) creation of Queues and user implementation of sequential processing with interceptors.

Below is the code I developed for you.

Client code (snippet):

Map<Object, Future><?>> results = new StreamThreads<Dish>()
.fork("shortMenu", menu, s -> s.map(Dish::getName).collect(joining(", "smilie))
.fork("totalCalories", menu, s -> s.mapToInt(Dish::getCalories).sum())
.fork("leastCaloricDish", menu, s -> s.collect(
reducing((d1, d2) -> d1.getCalories() < d2.getCalories() ? d1 : d2)).get())
.fork("dishesByType", menu, s -> s.collect(groupingBy(Dish::getType)))
.getResults();

try {
String shortMenu = (String)results.get("shortMenu").get();

int totalCalories = (Integer)results.get("totalCalories").get();

Dish leastCaloricDish = (Dish)results.get("leastCaloricDish").get();

Map<Dish.Type, List><Dish>> dishesByType =
(Map<Dish.Type,java.util.List><Dish>>smilieresults.get("dishesByType"smilie.get();
System.out.println("Short menu: " + shortMenu);
System.out.println("Total calories: " + totalCalories);
System.out.println("Least caloric dish: " + leastCaloricDish);
System.out.println("Dishes by type: " + dishesByType);

} catch (InterruptedException e) {
e.printStackTrace();
} catch(ExecutionException e) {
e.printStackTrace();
}


Here is the StreamThreads class:

public class StreamThreads<T> {

private ExecutorService executeThreads = null;
private final Map<Object, Future><?>> futures = new HashMap<>();

public StreamThreads() {
executeThreads = Executors.newFixedThreadPool(4);
}
public StreamThreads(int threads) {
executeThreads = Executors.newFixedThreadPool(threads);
}


public StreamThreads<T> fork(Object key, List<T> list,
Function<Stream><T>, ?> f) {

Stream<T> streamList = list.stream();

futures.put(key,
executeThreads.submit(
new StreamThreadHelper<T>(streamList, f) ) );

return this;
}

public Map<Object, Future><?>> getResults() {

executeThreads.shutdown();
return futures;
}
}


And finally here is the StreamThreadHelper Helper class:

public class StreamThreadHelper<T> implements Callable<Object> {

private final Stream<T> stream;
private final Function<Stream><T>, ?> f;

public StreamThreadHelper(Stream<T> stream, Function<Stream><T>, ?> f) {

this.stream = stream;
this.f = f;
}

@Override
public Object call() {

return f.apply(stream);
}

}


Regards,
Cem Redif