Saturday, January 11, 2014

Java 8 lambda with combiner

Lambda expression and functional programming are advanced features provided by Java 8. Here I would like to explain why and how the combiner is used in a practical example.

        
public class Person {

    String name;
    int age;
    Sex sex;
    .....


        Person p1 = new Person("Shaozhen", 5, male);
        Person p2 = new Person("Mike", 10, male);
        Person p3 = new Person("Darin", 20, male);
        Person p4 = new Person("Alice", 25, female);
        Person p5 = new Person("Mark", 30, male);

        List<person> persons = new ArrayList<person>();
        persons.add(p1);
        persons.add(p2);
        persons.add(p3);
        persons.add(p4);
        persons.add(p5);


The above code initializes a person array with five persons that person has name, age and sex attributes. Now the problem is to use lambda/function [1] to find the average age of male persons. In order to show the usage of combiner we are using a custom IntCustomer.
 
class Averager implements IntConsumer
{
    private int total = 0;
    private int count = 0;

    public double average() {
        return count > 0 ? ((double) total)/count : 0;
    }

    public void accept(int i) {
        total += i; count++;
    }
    public void combine(Averager other) {
        
    }
}

Then we can use lambda to filter the array and calculate the average.
 
    persons.stream().filter(p -> p.getSex() == male).map(p -> p.getAge()).collect(Averager::new, Averager::accept, Averager::combine).average()
The above code is to get the collection's stream (laziness since the filter operation, we do not want the mapping function to apply to all the items). collect method basically evaluates the lazy expression with arguments of method references. Averager::new, how to initialize the return object, Averager::accept, how to accumulate the item for each item. The implementation of the Average is straightforward. Whenever scan a new item, person's age accumulated into the total variable and count variable plus 1. Then total divided by average resulted the average age.

 However, assuming the persons array is a huge array and the computer has multiple cores. How the program takes advantages of parallel processing of multi core and advance the processing speed? Since Java 7 there is a fork/join [2] paradigm that split the workload recursively and aggregate the results. Java 8 Stream provides a way to process the collections in parallel with fork/join.
    persons.parallelStream().filter(p -> p.getSex() == male).map(p -> p.getAge()).collect(Averager::new, Averager::accept, Averager::combine).average();
The above code uses parallelStream to mark the processing way as fork/join. However, After running the program, we got a wrong result which is 5. The reason is that we have not implemented the combiner method. So when combine method applies, it always do nothing and conserves the left argument result, which is the first male person's age as 5. Now let's implement the combine method and get the correct result.
    
    public void combine(Averager other) {
        total += other.total;
        count += other.count;
    }

Along with above discussion, the keyword of Java 8 is about lambda, function programming, stream, fork/join. The combiner here is very similar with map/reduce's [3] reduce step.

Reference:

  1. Lambda tutorial
  2. fork/join tutorial
  3. Map Reduce